This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 3e80977bf [GLUTEN-5249] [CH] fix throw Unexpected empty column when
reading csv file (#5254)
3e80977bf is described below
commit 3e80977bf6bd2f5fdd9fb3983c5261db60739194
Author: shuai.xu <[email protected]>
AuthorDate: Tue Apr 16 15:51:02 2024 +0800
[GLUTEN-5249] [CH] fix throw Unexpected empty column when reading csv file
(#5254)
What changes were proposed in this pull request?
This pr fix that when NativeScan contains two same columns in schema, it
may throw Unexpected empty column when reading csv file.
(Fixes: #5249)
How was this patch tested?
This patch was tested by unit tests.
---
.../execution/BasicScanExecTransformer.scala | 22 +++-
.../gluten/utils/velox/VeloxTestSettings.scala | 3 +-
.../hive/execution/GlutenHiveSQLQueryCHSuite.scala | 102 ++++++++++++++++
.../hive/execution/GlutenHiveSQLQuerySuite.scala | 128 +--------------------
.../execution/GlutenHiveSQLQuerySuiteBase.scala | 97 ++++++++++++++++
.../gluten/utils/velox/VeloxTestSettings.scala | 3 +-
.../hive/execution/GlutenHiveSQLQueryCHSuite.scala | 102 ++++++++++++++++
.../hive/execution/GlutenHiveSQLQuerySuite.scala | 115 +-----------------
.../execution/GlutenHiveSQLQuerySuiteBase.scala | 97 ++++++++++++++++
.../gluten/utils/velox/VeloxTestSettings.scala | 3 +-
.../hive/execution/GlutenHiveSQLQueryCHSuite.scala | 102 ++++++++++++++++
.../hive/execution/GlutenHiveSQLQuerySuite.scala | 115 +-----------------
.../execution/GlutenHiveSQLQuerySuiteBase.scala | 97 ++++++++++++++++
13 files changed, 636 insertions(+), 350 deletions(-)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 6ec01af46..b0bc0ea7b 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -81,7 +81,8 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
val scanTime = longMetric("scanTime")
val substraitContext = new SubstraitContext
val transformContext = doTransform(substraitContext)
- val outNames =
outputAttributes().map(ConverterUtils.genColumnNameWithExprId).asJava
+ val outNames =
+
filteRedundantField(outputAttributes()).map(ConverterUtils.genColumnNameWithExprId).asJava
val planNode =
PlanBuilder.makePlan(substraitContext,
Lists.newArrayList(transformContext.root), outNames)
@@ -133,7 +134,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
}
override def doTransform(context: SubstraitContext): TransformContext = {
- val output = outputAttributes()
+ val output = filteRedundantField(outputAttributes())
val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output)
val columnTypeNodes = output.map {
@@ -177,4 +178,21 @@ trait BasicScanExecTransformer extends
LeafTransformSupport with BaseDataSource
context.nextOperatorId(this.nodeName))
TransformContext(output, output, readNode)
}
+
+ def filteRedundantField(outputs: Seq[Attribute]): Seq[Attribute] = {
+ var final_output: List[Attribute] = List()
+ val outputList = outputs.toArray
+ for (i <- 0 to outputList.size - 1) {
+ var dup = false
+ for (j <- 0 to i - 1) {
+ if (outputList(i).name == outputList(j).name) {
+ dup = true
+ }
+ }
+ if (!dup) {
+ final_output = final_output :+ outputList(i)
+ }
+ }
+ final_output.toSeq
+ }
}
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index ebe7f27bd..189a09d35 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -34,7 +34,7 @@ import
org.apache.spark.sql.execution.datasources.v2.GlutenFileTableSuite
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite,
GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import
org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite,
GlutenCustomerExpressionTransformerSuite, GlutenCustomerExtensionSuite,
GlutenSessionExtensionSuite}
-import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
+import org.apache.spark.sql.hive.execution.{GlutenHiveSQLQueryCHSuite,
GlutenHiveSQLQuerySuite}
import
org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite,
GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite,
GlutenDDLSourceLoadSuite,
GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite,
GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE,
GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite,
GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite,
GlutenPrunedScanSuite, GlutenResolve [...]
// Some settings' line length exceeds 100
@@ -1077,6 +1077,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenStatisticsCollectionSuite]
.exclude("SPARK-33687: analyze all tables in a specific database")
enableSuite[FallbackStrategiesSuite]
+ enableSuite[GlutenHiveSQLQueryCHSuite]
enableSuite[GlutenHiveSQLQuerySuite]
enableSuite[GlutenCollapseProjectExecTransformerSuite]
enableSuite[GlutenSparkSessionExtensionSuite]
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
new file mode 100644
index 000000000..40569d0d9
--- /dev/null
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.{DebugFilesystem, SparkConf}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.TableIdentifier
+
+class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
+
+ override def sparkConf: SparkConf = {
+ defaultSparkConf
+ .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
+ .set("spark.gluten.sql.columnar.backend.lib", "ch")
+ .set("spark.sql.storeAssignmentPolicy", "legacy")
+ .set("spark.default.parallelism", "1")
+ .set("spark.memory.offHeap.enabled", "true")
+ .set("spark.memory.offHeap.size", "1024MB")
+ .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
+ }
+
+ testGluten("5182: Fix failed to parse post join filters") {
+ withSQLConf(
+ "spark.sql.hive.convertMetastoreParquet" -> "false",
+ "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
+ sql("DROP TABLE IF EXISTS test_5182_0;")
+ sql("DROP TABLE IF EXISTS test_5182_1;")
+ sql(
+ "CREATE TABLE test_5182_0 (from_uid STRING, vgift_typeid int, vm_count
int, " +
+ "status bigint, ts bigint, vm_typeid int) " +
+ "USING hive OPTIONS(fileFormat 'parquet') PARTITIONED BY (`day`
STRING);")
+ sql(
+ "CREATE TABLE test_5182_1 (typeid int, groupid int, ss_id bigint, " +
+ "ss_start_time bigint, ss_end_time bigint) " +
+ "USING hive OPTIONS(fileFormat 'parquet');")
+ sql(
+ "INSERT INTO test_5182_0 partition(day='2024-03-31') " +
+ "VALUES('uid_1', 2, 10, 1, 11111111111, 2);")
+ sql("INSERT INTO test_5182_1 VALUES(2, 1, 1, 1000000000, 2111111111);")
+ val df = spark.sql(
+ "select ee.from_uid as uid,day, vgift_typeid, money from " +
+ "(select t_a.day, if(cast(substr(t_a.ts,1,10) as bigint) between " +
+ "t_b.ss_start_time and t_b.ss_end_time, t_b.ss_id, 0) ss_id, " +
+ "t_a.vgift_typeid, t_a.from_uid, vm_count money from " +
+ "(select from_uid,day,vgift_typeid,vm_count,ts from test_5182_0 " +
+ "where day between '2024-03-30' and '2024-03-31' and status=1 and
vm_typeid=2) t_a " +
+ "left join test_5182_1 t_b on t_a.vgift_typeid=t_b.typeid " +
+ "where t_b.groupid in (1,2)) ee where ss_id=1;")
+ checkAnswer(df, Seq(Row("uid_1", "2024-03-31", 2, 10)))
+ }
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_5182_0"),
+ ignoreIfNotExists = true,
+ purge = false)
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_5182_1"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
+
+ testGluten("5249: Reading csv may throw Unexpected empty column") {
+ withSQLConf(
+ "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false"
+ ) {
+ sql("DROP TABLE IF EXISTS test_5249;")
+ sql(
+ "CREATE TABLE test_5249 (name STRING, uid STRING) " +
+ "ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' " +
+ "STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' " +
+ "OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';")
+ sql("INSERT INTO test_5249 VALUES('name_1', 'id_1');")
+ val df = spark.sql(
+ "SELECT name, uid, count(distinct uid) total_uid_num from test_5249 " +
+ "group by name, uid with cube;")
+ checkAnswer(
+ df,
+ Seq(
+ Row("name_1", "id_1", 1),
+ Row("name_1", null, 1),
+ Row(null, "id_1", 1),
+ Row(null, null, 1)))
+ }
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_5249"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
+}
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
index 84c83aae3..6a10b0781 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
@@ -16,91 +16,12 @@
*/
package org.apache.spark.sql.hive.execution
-import org.apache.gluten.execution.TransformSupport
-
import org.apache.spark.SparkConf
-import org.apache.spark.internal.config
-import org.apache.spark.internal.config.UI.UI_ENABLED
-import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row, SparkSession}
+import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
-import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
-import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, HiveUtils}
-import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
-
-import scala.reflect.ClassTag
+import org.apache.spark.sql.hive.HiveTableScanExecTransformer
-class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
- private var _spark: SparkSession = null
-
- override def beforeAll(): Unit = {
- prepareWorkDir()
- if (_spark == null) {
- _spark =
SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
- }
-
- _spark.sparkContext.setLogLevel("info")
- }
-
- override protected def spark: SparkSession = _spark
-
- override def afterAll(): Unit = {
- try {
- super.afterAll()
- if (_spark != null) {
- try {
- _spark.sessionState.catalog.reset()
- } finally {
- _spark.stop()
- _spark = null
- }
- }
- } finally {
- SparkSession.clearActiveSession()
- SparkSession.clearDefaultSession()
- doThreadPostAudit()
- }
- }
-
- protected def defaultSparkConf: SparkConf = {
- val conf = new SparkConf()
- .set("spark.master", "local[1]")
- .set("spark.sql.test", "")
- .set("spark.sql.testkey", "true")
- .set(SQLConf.CODEGEN_FALLBACK.key, "false")
- .set(SQLConf.CODEGEN_FACTORY_MODE.key,
CodegenObjectFactoryMode.CODEGEN_ONLY.toString)
- .set(
- HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
- "org.apache.spark.sql.hive.execution.PairSerDe")
- // SPARK-8910
- .set(UI_ENABLED, false)
- .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
- // Hive changed the default of
hive.metastore.disallow.incompatible.col.type.changes
- // from false to true. For details, see the JIRA HIVE-12320 and
HIVE-17764.
-
.set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes",
"false")
- // Disable ConvertToLocalRelation for better test coverage. Test cases
built on
- // LocalRelation will exercise the optimization rules better by
disabling it as
- // this rule may potentially block testing of other optimization rules
such as
- // ConstantPropagation etc.
- .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
ConvertToLocalRelation.ruleName)
-
- conf.set(
- StaticSQLConf.WAREHOUSE_PATH,
- conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName)
- }
-
- /**
- * Get all the children plan of plans.
- *
- * @param plans
- * : the input plans.
- * @return
- */
-
- def checkOperatorMatch[T <: TransformSupport](df: DataFrame)(implicit tag:
ClassTag[T]): Unit = {
- val executedPlan = getExecutedPlan(df)
- assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass))
- }
+class GlutenHiveSQLQuerySuite extends GlutenHiveSQLQuerySuiteBase {
override def sparkConf: SparkConf = {
defaultSparkConf
@@ -108,7 +29,6 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
.set("spark.default.parallelism", "1")
.set("spark.memory.offHeap.enabled", "true")
.set("spark.memory.offHeap.size", "1024MB")
- .set("spark.gluten.sql.complexType.scan.fallback.enabled", "false")
}
testGluten("hive orc scan") {
@@ -129,7 +49,9 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
}
testGluten("avoid unnecessary filter binding for subfield during scan") {
- withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
+ withSQLConf(
+ "spark.sql.hive.convertMetastoreParquet" -> "false",
+ "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
sql("DROP TABLE IF EXISTS test_subfield")
sql(
"CREATE TABLE test_subfield (name STRING, favorite_color STRING, " +
@@ -147,42 +69,4 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
purge = false)
}
- testGluten("5182: Fix failed to parse post join filters") {
- withSQLConf(
- "spark.sql.hive.convertMetastoreParquet" -> "false",
- "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
- sql("DROP TABLE IF EXISTS test_5128_0;")
- sql("DROP TABLE IF EXISTS test_5128_1;")
- sql(
- "CREATE TABLE test_5128_0 (from_uid STRING, vgift_typeid int, vm_count
int, " +
- "status bigint, ts bigint, vm_typeid int) " +
- "USING hive OPTIONS(fileFormat 'parquet') PARTITIONED BY (`day`
STRING);")
- sql(
- "CREATE TABLE test_5128_1 (typeid int, groupid int, ss_id bigint, " +
- "ss_start_time bigint, ss_end_time bigint) " +
- "USING hive OPTIONS(fileFormat 'parquet');")
- sql(
- "INSERT INTO test_5128_0 partition(day='2024-03-31') " +
- "VALUES('uid_1', 2, 10, 1, 11111111111, 2);")
- sql("INSERT INTO test_5128_1 VALUES(2, 1, 1, 1000000000, 2111111111);")
- val df = spark.sql(
- "select ee.from_uid as uid,day, vgift_typeid, money from " +
- "(select t_a.day, if(cast(substr(t_a.ts,1,10) as bigint) between " +
- "t_b.ss_start_time and t_b.ss_end_time, t_b.ss_id, 0) ss_id, " +
- "t_a.vgift_typeid, t_a.from_uid, vm_count money from " +
- "(select from_uid,day,vgift_typeid,vm_count,ts from test_5128_0 " +
- "where day between '2024-03-30' and '2024-03-31' and status=1 and
vm_typeid=2) t_a " +
- "left join test_5128_1 t_b on t_a.vgift_typeid=t_b.typeid " +
- "where t_b.groupid in (1,2)) ee where ss_id=1;")
- checkAnswer(df, Seq(Row("uid_1", "2024-03-31", 2, 10)))
- }
- spark.sessionState.catalog.dropTable(
- TableIdentifier("test_5128_0"),
- ignoreIfNotExists = true,
- purge = false)
- spark.sessionState.catalog.dropTable(
- TableIdentifier("test_5128_1"),
- ignoreIfNotExists = true,
- purge = false)
- }
}
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala
new file mode 100644
index 000000000..c8540647d
--- /dev/null
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution
+
+import org.apache.gluten.execution.TransformSupport
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
+import org.apache.spark.internal.config.UI.UI_ENABLED
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
+import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
+import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.internal.SQLConf
+
+import scala.reflect.ClassTag
+
+abstract class GlutenHiveSQLQuerySuiteBase extends GlutenSQLTestsTrait {
+ private var _spark: SparkSession = null
+
+ override def beforeAll(): Unit = {
+ prepareWorkDir()
+ if (_spark == null) {
+ _spark =
SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
+ }
+
+ _spark.sparkContext.setLogLevel("warn")
+ }
+
+ override protected def spark: SparkSession = _spark
+
+ override def afterAll(): Unit = {
+ try {
+ super.afterAll()
+ if (_spark != null) {
+ try {
+ _spark.sessionState.catalog.reset()
+ } finally {
+ _spark.stop()
+ _spark = null
+ }
+ }
+ } finally {
+ SparkSession.clearActiveSession()
+ SparkSession.clearDefaultSession()
+ doThreadPostAudit()
+ }
+ }
+
+ protected def defaultSparkConf: SparkConf = {
+ val conf = new SparkConf()
+ .set("spark.master", "local[1]")
+ .set("spark.sql.test", "")
+ .set("spark.sql.testkey", "true")
+ .set(SQLConf.CODEGEN_FALLBACK.key, "false")
+ .set(SQLConf.CODEGEN_FACTORY_MODE.key,
CodegenObjectFactoryMode.CODEGEN_ONLY.toString)
+ .set(
+ HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
+ "org.apache.spark.sql.hive.execution.PairSerDe")
+ // SPARK-8910
+ .set(UI_ENABLED, false)
+ .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
+ // Hive changed the default of
hive.metastore.disallow.incompatible.col.type.changes
+ // from false to true. For details, see the JIRA HIVE-12320 and
HIVE-17764.
+
.set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes",
"false")
+ // Disable ConvertToLocalRelation for better test coverage. Test cases
built on
+ // LocalRelation will exercise the optimization rules better by
disabling it as
+ // this rule may potentially block testing of other optimization rules
such as
+ // ConstantPropagation etc.
+ .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
ConvertToLocalRelation.ruleName)
+
+ conf.set(
+ "spark.sql.warehouse.dir",
+ getClass.getResource("/").getPath +
"/tests-working-home/spark-warehouse")
+ val metastore = getClass.getResource("/").getPath +
getClass.getCanonicalName + "/metastore_db"
+ conf.set("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastore;create=true")
+ }
+
+ def checkOperatorMatch[T <: TransformSupport](df: DataFrame)(implicit tag:
ClassTag[T]): Unit = {
+ val executedPlan = getExecutedPlan(df)
+ assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass))
+ }
+}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 2abf3e1b8..79f182e24 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -37,7 +37,7 @@ import
org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite,
GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import
org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite,
GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
-import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
+import org.apache.spark.sql.hive.execution.{GlutenHiveSQLQueryCHSuite,
GlutenHiveSQLQuerySuite}
import
org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite,
GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite,
GlutenDDLSourceLoadSuite,
GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite,
GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE,
GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite,
GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite,
GlutenPrunedScanSuite, GlutenResolve [...]
// Some settings' line length exceeds 100
@@ -1130,6 +1130,7 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("cases when literal is max")
enableSuite[GlutenXPathFunctionsSuite]
enableSuite[GlutenFallbackSuite]
+ enableSuite[GlutenHiveSQLQueryCHSuite]
enableSuite[GlutenHiveSQLQuerySuite]
enableSuite[GlutenImplicitsTest]
enableSuite[GlutenCollapseProjectExecTransformerSuite]
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
new file mode 100644
index 000000000..40569d0d9
--- /dev/null
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.{DebugFilesystem, SparkConf}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.TableIdentifier
+
+class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
+
+ override def sparkConf: SparkConf = {
+ defaultSparkConf
+ .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
+ .set("spark.gluten.sql.columnar.backend.lib", "ch")
+ .set("spark.sql.storeAssignmentPolicy", "legacy")
+ .set("spark.default.parallelism", "1")
+ .set("spark.memory.offHeap.enabled", "true")
+ .set("spark.memory.offHeap.size", "1024MB")
+ .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
+ }
+
+ testGluten("5182: Fix failed to parse post join filters") {
+ withSQLConf(
+ "spark.sql.hive.convertMetastoreParquet" -> "false",
+ "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
+ sql("DROP TABLE IF EXISTS test_5182_0;")
+ sql("DROP TABLE IF EXISTS test_5182_1;")
+ sql(
+ "CREATE TABLE test_5182_0 (from_uid STRING, vgift_typeid int, vm_count
int, " +
+ "status bigint, ts bigint, vm_typeid int) " +
+ "USING hive OPTIONS(fileFormat 'parquet') PARTITIONED BY (`day`
STRING);")
+ sql(
+ "CREATE TABLE test_5182_1 (typeid int, groupid int, ss_id bigint, " +
+ "ss_start_time bigint, ss_end_time bigint) " +
+ "USING hive OPTIONS(fileFormat 'parquet');")
+ sql(
+ "INSERT INTO test_5182_0 partition(day='2024-03-31') " +
+ "VALUES('uid_1', 2, 10, 1, 11111111111, 2);")
+ sql("INSERT INTO test_5182_1 VALUES(2, 1, 1, 1000000000, 2111111111);")
+ val df = spark.sql(
+ "select ee.from_uid as uid,day, vgift_typeid, money from " +
+ "(select t_a.day, if(cast(substr(t_a.ts,1,10) as bigint) between " +
+ "t_b.ss_start_time and t_b.ss_end_time, t_b.ss_id, 0) ss_id, " +
+ "t_a.vgift_typeid, t_a.from_uid, vm_count money from " +
+ "(select from_uid,day,vgift_typeid,vm_count,ts from test_5182_0 " +
+ "where day between '2024-03-30' and '2024-03-31' and status=1 and
vm_typeid=2) t_a " +
+ "left join test_5182_1 t_b on t_a.vgift_typeid=t_b.typeid " +
+ "where t_b.groupid in (1,2)) ee where ss_id=1;")
+ checkAnswer(df, Seq(Row("uid_1", "2024-03-31", 2, 10)))
+ }
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_5182_0"),
+ ignoreIfNotExists = true,
+ purge = false)
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_5182_1"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
+
+ testGluten("5249: Reading csv may throw Unexpected empty column") {
+ withSQLConf(
+ "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false"
+ ) {
+ sql("DROP TABLE IF EXISTS test_5249;")
+ sql(
+ "CREATE TABLE test_5249 (name STRING, uid STRING) " +
+ "ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' " +
+ "STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' " +
+ "OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';")
+ sql("INSERT INTO test_5249 VALUES('name_1', 'id_1');")
+ val df = spark.sql(
+ "SELECT name, uid, count(distinct uid) total_uid_num from test_5249 " +
+ "group by name, uid with cube;")
+ checkAnswer(
+ df,
+ Seq(
+ Row("name_1", "id_1", 1),
+ Row("name_1", null, 1),
+ Row(null, "id_1", 1),
+ Row(null, null, 1)))
+ }
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_5249"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
+}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
index 50cd93841..923a1fa9f 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
@@ -16,84 +16,15 @@
*/
package org.apache.spark.sql.hive.execution
-import org.apache.gluten.execution.{FileSourceScanExecTransformer,
TransformSupport}
+import org.apache.gluten.execution.FileSourceScanExecTransformer
import org.apache.spark.SparkConf
-import org.apache.spark.internal.config
-import org.apache.spark.internal.config.UI.UI_ENABLED
-import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row, SparkSession}
+import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
-import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, HiveUtils}
-import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.hive.HiveTableScanExecTransformer
-import scala.reflect.ClassTag
-
-class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
- private var _spark: SparkSession = null
-
- override def beforeAll(): Unit = {
- prepareWorkDir()
- if (_spark == null) {
- _spark =
SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
- }
-
- _spark.sparkContext.setLogLevel("info")
- }
-
- override protected def spark: SparkSession = _spark
-
- override def afterAll(): Unit = {
- try {
- super.afterAll()
- if (_spark != null) {
- try {
- _spark.sessionState.catalog.reset()
- } finally {
- _spark.stop()
- _spark = null
- }
- }
- } finally {
- SparkSession.clearActiveSession()
- SparkSession.clearDefaultSession()
- doThreadPostAudit()
- }
- }
-
- protected def defaultSparkConf: SparkConf = {
- val conf = new SparkConf()
- .set("spark.master", "local[1]")
- .set("spark.sql.test", "")
- .set("spark.sql.testkey", "true")
- .set(SQLConf.CODEGEN_FALLBACK.key, "false")
- .set(SQLConf.CODEGEN_FACTORY_MODE.key,
CodegenObjectFactoryMode.CODEGEN_ONLY.toString)
- .set(
- HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
- "org.apache.spark.sql.hive.execution.PairSerDe")
- // SPARK-8910
- .set(UI_ENABLED, false)
- .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
- // Hive changed the default of
hive.metastore.disallow.incompatible.col.type.changes
- // from false to true. For details, see the JIRA HIVE-12320 and
HIVE-17764.
-
.set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes",
"false")
- // Disable ConvertToLocalRelation for better test coverage. Test cases
built on
- // LocalRelation will exercise the optimization rules better by
disabling it as
- // this rule may potentially block testing of other optimization rules
such as
- // ConstantPropagation etc.
- .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
ConvertToLocalRelation.ruleName)
-
- conf.set(
- StaticSQLConf.WAREHOUSE_PATH,
- conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName)
- }
-
- def checkOperatorMatch[T <: TransformSupport](df: DataFrame)(implicit tag:
ClassTag[T]): Unit = {
- val executedPlan = getExecutedPlan(df)
- assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass))
- }
+class GlutenHiveSQLQuerySuite extends GlutenHiveSQLQuerySuiteBase {
override def sparkConf: SparkConf = {
defaultSparkConf
@@ -204,42 +135,4 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
purge = false)
}
- testGluten("5182: Fix failed to parse post join filters") {
- withSQLConf(
- "spark.sql.hive.convertMetastoreParquet" -> "false",
- "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
- sql("DROP TABLE IF EXISTS test_5128_0;")
- sql("DROP TABLE IF EXISTS test_5128_1;")
- sql(
- "CREATE TABLE test_5128_0 (from_uid STRING, vgift_typeid int, vm_count
int, " +
- "status bigint, ts bigint, vm_typeid int) " +
- "USING hive OPTIONS(fileFormat 'parquet') PARTITIONED BY (`day`
STRING);")
- sql(
- "CREATE TABLE test_5128_1 (typeid int, groupid int, ss_id bigint, " +
- "ss_start_time bigint, ss_end_time bigint) " +
- "USING hive OPTIONS(fileFormat 'parquet');")
- sql(
- "INSERT INTO test_5128_0 partition(day='2024-03-31') " +
- "VALUES('uid_1', 2, 10, 1, 11111111111, 2);")
- sql("INSERT INTO test_5128_1 VALUES(2, 1, 1, 1000000000, 2111111111);")
- val df = spark.sql(
- "select ee.from_uid as uid,day, vgift_typeid, money from " +
- "(select t_a.day, if(cast(substr(t_a.ts,1,10) as bigint) between " +
- "t_b.ss_start_time and t_b.ss_end_time, t_b.ss_id, 0) ss_id, " +
- "t_a.vgift_typeid, t_a.from_uid, vm_count money from " +
- "(select from_uid,day,vgift_typeid,vm_count,ts from test_5128_0 " +
- "where day between '2024-03-30' and '2024-03-31' and status=1 and
vm_typeid=2) t_a " +
- "left join test_5128_1 t_b on t_a.vgift_typeid=t_b.typeid " +
- "where t_b.groupid in (1,2)) ee where ss_id=1;")
- checkAnswer(df, Seq(Row("uid_1", "2024-03-31", 2, 10)))
- }
- spark.sessionState.catalog.dropTable(
- TableIdentifier("test_5128_0"),
- ignoreIfNotExists = true,
- purge = false)
- spark.sessionState.catalog.dropTable(
- TableIdentifier("test_5128_1"),
- ignoreIfNotExists = true,
- purge = false)
- }
}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala
new file mode 100644
index 000000000..c8540647d
--- /dev/null
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution
+
+import org.apache.gluten.execution.TransformSupport
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
+import org.apache.spark.internal.config.UI.UI_ENABLED
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
+import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
+import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.internal.SQLConf
+
+import scala.reflect.ClassTag
+
+abstract class GlutenHiveSQLQuerySuiteBase extends GlutenSQLTestsTrait {
+ private var _spark: SparkSession = null
+
+ override def beforeAll(): Unit = {
+ prepareWorkDir()
+ if (_spark == null) {
+ _spark =
SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
+ }
+
+ _spark.sparkContext.setLogLevel("warn")
+ }
+
+ override protected def spark: SparkSession = _spark
+
+ override def afterAll(): Unit = {
+ try {
+ super.afterAll()
+ if (_spark != null) {
+ try {
+ _spark.sessionState.catalog.reset()
+ } finally {
+ _spark.stop()
+ _spark = null
+ }
+ }
+ } finally {
+ SparkSession.clearActiveSession()
+ SparkSession.clearDefaultSession()
+ doThreadPostAudit()
+ }
+ }
+
+ protected def defaultSparkConf: SparkConf = {
+ val conf = new SparkConf()
+ .set("spark.master", "local[1]")
+ .set("spark.sql.test", "")
+ .set("spark.sql.testkey", "true")
+ .set(SQLConf.CODEGEN_FALLBACK.key, "false")
+ .set(SQLConf.CODEGEN_FACTORY_MODE.key,
CodegenObjectFactoryMode.CODEGEN_ONLY.toString)
+ .set(
+ HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
+ "org.apache.spark.sql.hive.execution.PairSerDe")
+ // SPARK-8910
+ .set(UI_ENABLED, false)
+ .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
+ // Hive changed the default of
hive.metastore.disallow.incompatible.col.type.changes
+ // from false to true. For details, see the JIRA HIVE-12320 and
HIVE-17764.
+
.set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes",
"false")
+ // Disable ConvertToLocalRelation for better test coverage. Test cases
built on
+ // LocalRelation will exercise the optimization rules better by
disabling it as
+ // this rule may potentially block testing of other optimization rules
such as
+ // ConstantPropagation etc.
+ .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
ConvertToLocalRelation.ruleName)
+
+ conf.set(
+ "spark.sql.warehouse.dir",
+ getClass.getResource("/").getPath +
"/tests-working-home/spark-warehouse")
+ val metastore = getClass.getResource("/").getPath +
getClass.getCanonicalName + "/metastore_db"
+ conf.set("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastore;create=true")
+ }
+
+ def checkOperatorMatch[T <: TransformSupport](df: DataFrame)(implicit tag:
ClassTag[T]): Unit = {
+ val executedPlan = getExecutedPlan(df)
+ assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass))
+ }
+}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 0488e1148..9d297e4ea 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -37,7 +37,7 @@ import
org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite,
GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import
org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite,
GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
-import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
+import org.apache.spark.sql.hive.execution.{GlutenHiveSQLQueryCHSuite,
GlutenHiveSQLQuerySuite}
import
org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite,
GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite,
GlutenDDLSourceLoadSuite,
GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite,
GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE,
GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite,
GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite,
GlutenPrunedScanSuite, GlutenResolve [...]
// Some settings' line length exceeds 100
@@ -1143,6 +1143,7 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("cases when literal is max")
enableSuite[GlutenXPathFunctionsSuite]
enableSuite[GlutenFallbackSuite]
+ enableSuite[GlutenHiveSQLQueryCHSuite]
enableSuite[GlutenHiveSQLQuerySuite]
enableSuite[GlutenCollapseProjectExecTransformerSuite]
enableSuite[GlutenSparkSessionExtensionSuite]
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
new file mode 100644
index 000000000..40569d0d9
--- /dev/null
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQueryCHSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.{DebugFilesystem, SparkConf}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.TableIdentifier
+
+class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
+
+ override def sparkConf: SparkConf = {
+ defaultSparkConf
+ .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
+ .set("spark.gluten.sql.columnar.backend.lib", "ch")
+ .set("spark.sql.storeAssignmentPolicy", "legacy")
+ .set("spark.default.parallelism", "1")
+ .set("spark.memory.offHeap.enabled", "true")
+ .set("spark.memory.offHeap.size", "1024MB")
+ .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
+ }
+
+ testGluten("5182: Fix failed to parse post join filters") {
+ withSQLConf(
+ "spark.sql.hive.convertMetastoreParquet" -> "false",
+ "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
+ sql("DROP TABLE IF EXISTS test_5182_0;")
+ sql("DROP TABLE IF EXISTS test_5182_1;")
+ sql(
+ "CREATE TABLE test_5182_0 (from_uid STRING, vgift_typeid int, vm_count
int, " +
+ "status bigint, ts bigint, vm_typeid int) " +
+ "USING hive OPTIONS(fileFormat 'parquet') PARTITIONED BY (`day`
STRING);")
+ sql(
+ "CREATE TABLE test_5182_1 (typeid int, groupid int, ss_id bigint, " +
+ "ss_start_time bigint, ss_end_time bigint) " +
+ "USING hive OPTIONS(fileFormat 'parquet');")
+ sql(
+ "INSERT INTO test_5182_0 partition(day='2024-03-31') " +
+ "VALUES('uid_1', 2, 10, 1, 11111111111, 2);")
+ sql("INSERT INTO test_5182_1 VALUES(2, 1, 1, 1000000000, 2111111111);")
+ val df = spark.sql(
+ "select ee.from_uid as uid,day, vgift_typeid, money from " +
+ "(select t_a.day, if(cast(substr(t_a.ts,1,10) as bigint) between " +
+ "t_b.ss_start_time and t_b.ss_end_time, t_b.ss_id, 0) ss_id, " +
+ "t_a.vgift_typeid, t_a.from_uid, vm_count money from " +
+ "(select from_uid,day,vgift_typeid,vm_count,ts from test_5182_0 " +
+ "where day between '2024-03-30' and '2024-03-31' and status=1 and
vm_typeid=2) t_a " +
+ "left join test_5182_1 t_b on t_a.vgift_typeid=t_b.typeid " +
+ "where t_b.groupid in (1,2)) ee where ss_id=1;")
+ checkAnswer(df, Seq(Row("uid_1", "2024-03-31", 2, 10)))
+ }
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_5182_0"),
+ ignoreIfNotExists = true,
+ purge = false)
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_5182_1"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
+
+ testGluten("5249: Reading csv may throw Unexpected empty column") {
+ withSQLConf(
+ "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false"
+ ) {
+ sql("DROP TABLE IF EXISTS test_5249;")
+ sql(
+ "CREATE TABLE test_5249 (name STRING, uid STRING) " +
+ "ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' " +
+ "STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' " +
+ "OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';")
+ sql("INSERT INTO test_5249 VALUES('name_1', 'id_1');")
+ val df = spark.sql(
+ "SELECT name, uid, count(distinct uid) total_uid_num from test_5249 " +
+ "group by name, uid with cube;")
+ checkAnswer(
+ df,
+ Seq(
+ Row("name_1", "id_1", 1),
+ Row("name_1", null, 1),
+ Row(null, "id_1", 1),
+ Row(null, null, 1)))
+ }
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_5249"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
+}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
index 15a649acf..b348f6719 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
@@ -16,83 +16,12 @@
*/
package org.apache.spark.sql.hive.execution
-import org.apache.gluten.execution.TransformSupport
-
import org.apache.spark.SparkConf
-import org.apache.spark.internal.config
-import org.apache.spark.internal.config.UI.UI_ENABLED
-import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row, SparkSession}
+import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
-import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
-import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, HiveUtils}
-import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
-
-import scala.reflect.ClassTag
-
-class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
- private var _spark: SparkSession = null
-
- override def beforeAll(): Unit = {
- prepareWorkDir()
- if (_spark == null) {
- _spark =
SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
- }
-
- _spark.sparkContext.setLogLevel("info")
- }
-
- override protected def spark: SparkSession = _spark
-
- override def afterAll(): Unit = {
- try {
- super.afterAll()
- if (_spark != null) {
- try {
- _spark.sessionState.catalog.reset()
- } finally {
- _spark.stop()
- _spark = null
- }
- }
- } finally {
- SparkSession.clearActiveSession()
- SparkSession.clearDefaultSession()
- doThreadPostAudit()
- }
- }
-
- protected def defaultSparkConf: SparkConf = {
- val conf = new SparkConf()
- .set("spark.master", "local[1]")
- .set("spark.sql.test", "")
- .set("spark.sql.testkey", "true")
- .set(SQLConf.CODEGEN_FALLBACK.key, "false")
- .set(SQLConf.CODEGEN_FACTORY_MODE.key,
CodegenObjectFactoryMode.CODEGEN_ONLY.toString)
- .set(
- HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
- "org.apache.spark.sql.hive.execution.PairSerDe")
- // SPARK-8910
- .set(UI_ENABLED, false)
- .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
- // Hive changed the default of
hive.metastore.disallow.incompatible.col.type.changes
- // from false to true. For details, see the JIRA HIVE-12320 and
HIVE-17764.
-
.set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes",
"false")
- // Disable ConvertToLocalRelation for better test coverage. Test cases
built on
- // LocalRelation will exercise the optimization rules better by
disabling it as
- // this rule may potentially block testing of other optimization rules
such as
- // ConstantPropagation etc.
- .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
ConvertToLocalRelation.ruleName)
-
- conf.set(
- StaticSQLConf.WAREHOUSE_PATH,
- conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName)
- }
+import org.apache.spark.sql.hive.HiveTableScanExecTransformer
- def checkOperatorMatch[T <: TransformSupport](df: DataFrame)(implicit tag:
ClassTag[T]): Unit = {
- val executedPlan = getExecutedPlan(df)
- assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass))
- }
+class GlutenHiveSQLQuerySuite extends GlutenHiveSQLQuerySuiteBase {
override def sparkConf: SparkConf = {
defaultSparkConf
@@ -119,42 +48,4 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
purge = false)
}
- testGluten("5182: Fix failed to parse post join filters") {
- withSQLConf(
- "spark.sql.hive.convertMetastoreParquet" -> "false",
- "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
- sql("DROP TABLE IF EXISTS test_5128_0;")
- sql("DROP TABLE IF EXISTS test_5128_1;")
- sql(
- "CREATE TABLE test_5128_0 (from_uid STRING, vgift_typeid int, vm_count
int, " +
- "status bigint, ts bigint, vm_typeid int) " +
- "USING hive OPTIONS(fileFormat 'parquet') PARTITIONED BY (`day`
STRING);")
- sql(
- "CREATE TABLE test_5128_1 (typeid int, groupid int, ss_id bigint, " +
- "ss_start_time bigint, ss_end_time bigint) " +
- "USING hive OPTIONS(fileFormat 'parquet');")
- sql(
- "INSERT INTO test_5128_0 partition(day='2024-03-31') " +
- "VALUES('uid_1', 2, 10, 1, 11111111111, 2);")
- sql("INSERT INTO test_5128_1 VALUES(2, 1, 1, 1000000000, 2111111111);")
- val df = spark.sql(
- "select ee.from_uid as uid,day, vgift_typeid, money from " +
- "(select t_a.day, if(cast(substr(t_a.ts,1,10) as bigint) between " +
- "t_b.ss_start_time and t_b.ss_end_time, t_b.ss_id, 0) ss_id, " +
- "t_a.vgift_typeid, t_a.from_uid, vm_count money from " +
- "(select from_uid,day,vgift_typeid,vm_count,ts from test_5128_0 " +
- "where day between '2024-03-30' and '2024-03-31' and status=1 and
vm_typeid=2) t_a " +
- "left join test_5128_1 t_b on t_a.vgift_typeid=t_b.typeid " +
- "where t_b.groupid in (1,2)) ee where ss_id=1;")
- checkAnswer(df, Seq(Row("uid_1", "2024-03-31", 2, 10)))
- }
- spark.sessionState.catalog.dropTable(
- TableIdentifier("test_5128_0"),
- ignoreIfNotExists = true,
- purge = false)
- spark.sessionState.catalog.dropTable(
- TableIdentifier("test_5128_1"),
- ignoreIfNotExists = true,
- purge = false)
- }
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala
new file mode 100644
index 000000000..c8540647d
--- /dev/null
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuiteBase.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive.execution
+
+import org.apache.gluten.execution.TransformSupport
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
+import org.apache.spark.internal.config.UI.UI_ENABLED
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
+import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
+import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.internal.SQLConf
+
+import scala.reflect.ClassTag
+
+abstract class GlutenHiveSQLQuerySuiteBase extends GlutenSQLTestsTrait {
+ private var _spark: SparkSession = null
+
+ override def beforeAll(): Unit = {
+ prepareWorkDir()
+ if (_spark == null) {
+ _spark =
SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
+ }
+
+ _spark.sparkContext.setLogLevel("warn")
+ }
+
+ override protected def spark: SparkSession = _spark
+
+ override def afterAll(): Unit = {
+ try {
+ super.afterAll()
+ if (_spark != null) {
+ try {
+ _spark.sessionState.catalog.reset()
+ } finally {
+ _spark.stop()
+ _spark = null
+ }
+ }
+ } finally {
+ SparkSession.clearActiveSession()
+ SparkSession.clearDefaultSession()
+ doThreadPostAudit()
+ }
+ }
+
+ protected def defaultSparkConf: SparkConf = {
+ val conf = new SparkConf()
+ .set("spark.master", "local[1]")
+ .set("spark.sql.test", "")
+ .set("spark.sql.testkey", "true")
+ .set(SQLConf.CODEGEN_FALLBACK.key, "false")
+ .set(SQLConf.CODEGEN_FACTORY_MODE.key,
CodegenObjectFactoryMode.CODEGEN_ONLY.toString)
+ .set(
+ HiveUtils.HIVE_METASTORE_BARRIER_PREFIXES.key,
+ "org.apache.spark.sql.hive.execution.PairSerDe")
+ // SPARK-8910
+ .set(UI_ENABLED, false)
+ .set(config.UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
+ // Hive changed the default of
hive.metastore.disallow.incompatible.col.type.changes
+ // from false to true. For details, see the JIRA HIVE-12320 and
HIVE-17764.
+
.set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes",
"false")
+ // Disable ConvertToLocalRelation for better test coverage. Test cases
built on
+ // LocalRelation will exercise the optimization rules better by
disabling it as
+ // this rule may potentially block testing of other optimization rules
such as
+ // ConstantPropagation etc.
+ .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
ConvertToLocalRelation.ruleName)
+
+ conf.set(
+ "spark.sql.warehouse.dir",
+ getClass.getResource("/").getPath +
"/tests-working-home/spark-warehouse")
+ val metastore = getClass.getResource("/").getPath +
getClass.getCanonicalName + "/metastore_db"
+ conf.set("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastore;create=true")
+ }
+
+ def checkOperatorMatch[T <: TransformSupport](df: DataFrame)(implicit tag:
ClassTag[T]): Unit = {
+ val executedPlan = getExecutedPlan(df)
+ assert(executedPlan.exists(plan => plan.getClass == tag.runtimeClass))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]