This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f41129ec7 [GLUTEN-6067][CH][MINOR][UT] Followup 6623, fix
backends-clickhouse ut issse in CI (#6891)
f41129ec7 is described below
commit f41129ec70bb99bba52a1a566da254d75b7cb2c8
Author: Chang chen <[email protected]>
AuthorDate: Sat Aug 17 18:32:55 2024 +0800
[GLUTEN-6067][CH][MINOR][UT] Followup 6623, fix backends-clickhouse ut
issse in CI (#6891)
* fix fallback in spark 3.5
* Remove hive support in GlutenClickhouseFunctionSuite
* Move Hive related suite into hive package
* fix ut for spark 35
* fix celeborn ut for spark 35
* fix gluten ut for spark 35
* remove duplicated dependency
* fix dependency for spark 3.5 ut
---
.../GlutenClickHouseMergeTreeWriteOnS3Suite.scala | 21 ++-
...lutenClickHouseWholeStageTransformerSuite.scala | 12 +-
.../GlutenClickhouseCountDistinctSuite.scala | 3 +-
.../execution/GlutenClickhouseFunctionSuite.scala | 152 +++++++--------------
.../GlutenClickHouseHiveTableSuite.scala | 97 ++-----------
.../GlutenClickHouseNativeWriteTableSuite.scala | 51 +------
.../GlutenClickHouseTableAfterRestart.scala | 83 ++++-------
.../execution/hive/ReCreateHiveSession.scala | 69 ++++++++++
.../parquet/GlutenParquetFilterSuite.scala | 6 -
.../gluten/test/AllDataTypesWithComplexType.scala | 67 +++++++++
gluten-celeborn/clickhouse/pom.xml | 32 +++++
.../org/apache/gluten/test/FallbackUtil.scala | 29 ++--
gluten-ut/pom.xml | 2 +-
gluten-ut/spark35/pom.xml | 37 ++++-
gluten-ut/test/pom.xml | 39 +++++-
15 files changed, 354 insertions(+), 346 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index 6a473cc54..87e95cbe9 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -188,20 +188,33 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
var metadataGlutenExist: Boolean = false
var metadataBinExist: Boolean = false
var dataBinExist: Boolean = false
+ var hasCommits = false
client
.listObjects(args)
.forEach(
obj => {
objectCount += 1
- if (obj.get().objectName().contains("metadata.gluten")) {
+ val objectName = obj.get().objectName()
+ if (objectName.contains("metadata.gluten")) {
metadataGlutenExist = true
- } else if (obj.get().objectName().contains("meta.bin")) {
+ } else if (objectName.contains("meta.bin")) {
metadataBinExist = true
- } else if (obj.get().objectName().contains("data.bin")) {
+ } else if (objectName.contains("data.bin")) {
dataBinExist = true
+ } else if (objectName.contains("_commits")) {
+ // Spark 35 has _commits directory
+ // table/_delta_log/_commits/
+ hasCommits = true
}
})
- assertResult(5)(objectCount)
+
+ if (isSparkVersionGE("3.5")) {
+ assertResult(6)(objectCount)
+ assert(hasCommits)
+ } else {
+ assertResult(5)(objectCount)
+ }
+
assert(metadataGlutenExist)
assert(metadataBinExist)
assert(dataBinExist)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
index 497286115..f914eaa18 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
@@ -178,11 +178,13 @@ class GlutenClickHouseWholeStageTransformerSuite extends
WholeStageTransformerSu
super.beforeAll()
}
- protected val rootPath: String = this.getClass.getResource("/").getPath
- protected val basePath: String = rootPath + "tests-working-home"
- protected val warehouse: String = basePath + "/spark-warehouse"
- protected val metaStorePathAbsolute: String = basePath + "/meta"
- protected val hiveMetaStoreDB: String = metaStorePathAbsolute +
"/metastore_db"
+ final protected val rootPath: String = this.getClass.getResource("/").getPath
+ final protected val basePath: String = rootPath + "tests-working-home"
+ final protected val warehouse: String = basePath + "/spark-warehouse"
+ final protected val metaStorePathAbsolute: String = basePath + "/meta"
+
+ protected val hiveMetaStoreDB: String =
+ s"$metaStorePathAbsolute/${getClass.getSimpleName}/metastore_db"
final override protected val resourcePath: String = "" // ch not need this
override protected val fileFormat: String = "parquet"
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala
index 28ff5874f..383681733 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseCountDistinctSuite.scala
@@ -16,7 +16,8 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.execution.AllDataTypesWithComplexType.genTestData
+import org.apache.gluten.test.AllDataTypesWithComplexType
+import org.apache.gluten.test.AllDataTypesWithComplexType.genTestData
import org.apache.spark.SparkConf
class GlutenClickhouseCountDistinctSuite extends
GlutenClickHouseWholeStageTransformerSuite {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseFunctionSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseFunctionSuite.scala
index 1d4d1b6f8..ac18f256e 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseFunctionSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickhouseFunctionSuite.scala
@@ -20,12 +20,6 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.utils.UTSystemParameters
import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.delta.DeltaLog
-
-import org.apache.commons.io.FileUtils
-
-import java.io.File
class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
override protected val needCopyParquetToTablePath = true
@@ -39,9 +33,6 @@ class GlutenClickhouseFunctionSuite extends
GlutenClickHouseTPCHAbstractSuite {
createNotNullTPCHTablesInParquet(tablesPath)
}
- private var _hiveSpark: SparkSession = _
- override protected def spark: SparkSession = _hiveSpark
-
override protected def sparkConf: SparkConf = {
new SparkConf()
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
@@ -69,70 +60,21 @@ class GlutenClickhouseFunctionSuite extends
GlutenClickHouseTPCHAbstractSuite {
.setMaster("local[1]")
}
- override protected def initializeSession(): Unit = {
- if (_hiveSpark == null) {
- val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
- _hiveSpark = SparkSession
- .builder()
- .config(sparkConf)
- .enableHiveSupport()
- .config(
- "javax.jdo.option.ConnectionURL",
- s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true")
- .getOrCreate()
- }
- }
-
- override def beforeAll(): Unit = {
- // prepare working paths
- val basePathDir = new File(basePath)
- if (basePathDir.exists()) {
- FileUtils.forceDelete(basePathDir)
- }
- FileUtils.forceMkdir(basePathDir)
- FileUtils.forceMkdir(new File(warehouse))
- FileUtils.forceMkdir(new File(metaStorePathAbsolute))
- FileUtils.copyDirectory(new File(rootPath + resourcePath), new
File(tablesPath))
- super.beforeAll()
- }
-
- override protected def afterAll(): Unit = {
- DeltaLog.clearCache()
-
- try {
- super.afterAll()
- } finally {
- try {
- if (_hiveSpark != null) {
- try {
- _hiveSpark.sessionState.catalog.reset()
- } finally {
- _hiveSpark.stop()
- _hiveSpark = null
- }
- }
- } finally {
- SparkSession.clearActiveSession()
- SparkSession.clearDefaultSession()
- }
- }
- }
-
test("test uuid - write and read") {
withSQLConf(
("spark.gluten.sql.native.writer.enabled", "true"),
(GlutenConfig.GLUTEN_ENABLED.key, "true")) {
+ withTable("uuid_test") {
+ spark.sql("create table if not exists uuid_test (id string) using
parquet")
- spark.sql("drop table if exists uuid_test")
- spark.sql("create table if not exists uuid_test (id string) stored as
parquet")
-
- val df = spark.sql("select regexp_replace(uuid(), '-', '') as id from
range(1)")
- df.cache()
- df.write.insertInto("uuid_test")
+ val df = spark.sql("select regexp_replace(uuid(), '-', '') as id from
range(1)")
+ df.cache()
+ df.write.insertInto("uuid_test")
- val df2 = spark.table("uuid_test")
- val diffCount = df.exceptAll(df2).count()
- assert(diffCount == 0)
+ val df2 = spark.table("uuid_test")
+ val diffCount = df.exceptAll(df2).count()
+ assert(diffCount == 0)
+ }
}
}
@@ -181,49 +123,51 @@ class GlutenClickhouseFunctionSuite extends
GlutenClickHouseTPCHAbstractSuite {
}
test("GLUTEN-5981 null value from get_json_object") {
- spark.sql("create table json_t1 (a string) using parquet")
- spark.sql("insert into json_t1 values ('{\"a\":null}')")
- runQueryAndCompare(
- """
- |SELECT get_json_object(a, '$.a') is null from json_t1
- |""".stripMargin
- )(df => checkFallbackOperators(df, 0))
- spark.sql("drop table json_t1")
+ withTable("json_t1") {
+ spark.sql("create table json_t1 (a string) using parquet")
+ spark.sql("insert into json_t1 values ('{\"a\":null}')")
+ runQueryAndCompare(
+ """
+ |SELECT get_json_object(a, '$.a') is null from json_t1
+ |""".stripMargin
+ )(df => checkFallbackOperators(df, 0))
+ }
}
test("Fix arrayDistinct(Array(Nullable(Decimal))) core dump") {
- val create_sql =
- """
- |create table if not exists test(
- | dec array<decimal(10, 2)>
- |) using parquet
- |""".stripMargin
- val fill_sql =
- """
- |insert into test values(array(1, 2, null)), (array(null, 2,3, 5))
- |""".stripMargin
- val query_sql =
- """
- |select array_distinct(dec) from test;
- |""".stripMargin
- spark.sql(create_sql)
- spark.sql(fill_sql)
- compareResultsAgainstVanillaSpark(query_sql, true, { _ => })
- spark.sql("drop table test")
+ withTable("json_t1") {
+ val create_sql =
+ """
+ |create table if not exists test(
+ | dec array<decimal(10, 2)>
+ |) using parquet
+ |""".stripMargin
+ val fill_sql =
+ """
+ |insert into test values(array(1, 2, null)), (array(null, 2,3, 5))
+ |""".stripMargin
+ val query_sql =
+ """
+ |select array_distinct(dec) from test;
+ |""".stripMargin
+ spark.sql(create_sql)
+ spark.sql(fill_sql)
+ compareResultsAgainstVanillaSpark(query_sql, true, { _ => })
+ }
}
test("intersect all") {
- spark.sql("create table t1 (a int, b string) using parquet")
- spark.sql("insert into t1 values (1, '1'),(2, '2'),(3, '3'),(4, '4'),(5,
'5'),(6, '6')")
- spark.sql("create table t2 (a int, b string) using parquet")
- spark.sql("insert into t2 values (4, '4'),(5, '5'),(6, '6'),(7, '7'),(8,
'8'),(9, '9')")
- runQueryAndCompare(
- """
- |SELECT a,b FROM t1 INTERSECT ALL SELECT a,b FROM t2
- |""".stripMargin
- )(df => checkFallbackOperators(df, 0))
- spark.sql("drop table t1")
- spark.sql("drop table t2")
+ withTable("t1", "t2") {
+ spark.sql("create table t1 (a int, b string) using parquet")
+ spark.sql("insert into t1 values (1, '1'),(2, '2'),(3, '3'),(4, '4'),(5,
'5'),(6, '6')")
+ spark.sql("create table t2 (a int, b string) using parquet")
+ spark.sql("insert into t2 values (4, '4'),(5, '5'),(6, '6'),(7, '7'),(8,
'8'),(9, '9')")
+ runQueryAndCompare(
+ """
+ |SELECT a,b FROM t1 INTERSECT ALL SELECT a,b FROM t2
+ |""".stripMargin
+ )(df => checkFallbackOperators(df, 0))
+ }
}
test("array decimal32 CH column to row") {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
similarity index 94%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
index 83bc4e76b..cc9155613 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
@@ -14,13 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.hive
import org.apache.gluten.GlutenConfig
+import
org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite,
ProjectExecTransformer, TransformSupport}
+import org.apache.gluten.test.AllDataTypesWithComplexType
import org.apache.gluten.utils.UTSystemParameters
import org.apache.spark.SparkConf
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
@@ -29,64 +31,14 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.hadoop.fs.Path
import java.io.{File, PrintWriter}
-import java.sql.{Date, Timestamp}
import scala.reflect.ClassTag
-case class AllDataTypesWithComplexType(
- string_field: String = null,
- int_field: java.lang.Integer = null,
- long_field: java.lang.Long = null,
- float_field: java.lang.Float = null,
- double_field: java.lang.Double = null,
- short_field: java.lang.Short = null,
- byte_field: java.lang.Byte = null,
- boolean_field: java.lang.Boolean = null,
- decimal_field: java.math.BigDecimal = null,
- date_field: java.sql.Date = null,
- timestamp_field: java.sql.Timestamp = null,
- array: Seq[Int] = null,
- arrayContainsNull: Seq[Option[Int]] = null,
- map: Map[Int, Long] = null,
- mapValueContainsNull: Map[Int, Option[Long]] = null
-)
-
-object AllDataTypesWithComplexType {
- def genTestData(): Seq[AllDataTypesWithComplexType] = {
- (0 to 199).map {
- i =>
- if (i % 100 == 1) {
- AllDataTypesWithComplexType()
- } else {
- AllDataTypesWithComplexType(
- s"$i",
- i,
- i.toLong,
- i.toFloat,
- i.toDouble,
- i.toShort,
- i.toByte,
- i % 2 == 0,
- new java.math.BigDecimal(i + ".56"),
- Date.valueOf(new
Date(System.currentTimeMillis()).toLocalDate.plusDays(i % 10)),
- Timestamp.valueOf(
- new
Timestamp(System.currentTimeMillis()).toLocalDateTime.plusDays(i % 10)),
- Seq.apply(i + 1, i + 2, i + 3),
- Seq.apply(Option.apply(i + 1), Option.empty, Option.apply(i + 3)),
- Map.apply((i + 1, i + 2), (i + 3, i + 4)),
- Map.empty
- )
- }
- }
- }
-}
-
class GlutenClickHouseHiveTableSuite
extends GlutenClickHouseWholeStageTransformerSuite
+ with ReCreateHiveSession
with AdaptiveSparkPlanHelper {
- private var _hiveSpark: SparkSession = _
-
override protected def sparkConf: SparkConf = {
new SparkConf()
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
@@ -119,22 +71,6 @@ class GlutenClickHouseHiveTableSuite
.setMaster("local[*]")
}
- override protected def spark: SparkSession = _hiveSpark
-
- override protected def initializeSession(): Unit = {
- if (_hiveSpark == null) {
- val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
- _hiveSpark = SparkSession
- .builder()
- .config(sparkConf)
- .enableHiveSupport()
- .config(
- "javax.jdo.option.ConnectionURL",
- s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true")
- .getOrCreate()
- }
- }
-
private val txt_table_name = "hive_txt_test"
private val txt_user_define_input = "hive_txt_user_define_input"
private val json_table_name = "hive_json_test"
@@ -235,24 +171,7 @@ class GlutenClickHouseHiveTableSuite
override protected def afterAll(): Unit = {
DeltaLog.clearCache()
-
- try {
- super.afterAll()
- } finally {
- try {
- if (_hiveSpark != null) {
- try {
- _hiveSpark.sessionState.catalog.reset()
- } finally {
- _hiveSpark.stop()
- _hiveSpark = null
- }
- }
- } finally {
- SparkSession.clearActiveSession()
- SparkSession.clearDefaultSession()
- }
- }
+ super.afterAll()
}
test("test hive text table") {
@@ -957,7 +876,7 @@ class GlutenClickHouseHiveTableSuite
val select_sql_4 = "select id, get_json_object(data, '$.v111') from
test_tbl_3337"
val select_sql_5 = "select id, get_json_object(data, 'v112') from
test_tbl_3337"
val select_sql_6 =
- "select id, get_json_object(data, '$.id') from test_tbl_3337 where id =
123";
+ "select id, get_json_object(data, '$.id') from test_tbl_3337 where id =
123"
compareResultsAgainstVanillaSpark(select_sql_1, compareResult = true, _ =>
{})
compareResultsAgainstVanillaSpark(select_sql_2, compareResult = true, _ =>
{})
compareResultsAgainstVanillaSpark(select_sql_3, compareResult = true, _ =>
{})
@@ -1311,7 +1230,7 @@ class GlutenClickHouseHiveTableSuite
.format(dataPath)
val select_sql = "select * from test_tbl_6506"
spark.sql(create_table_sql)
- compareResultsAgainstVanillaSpark(select_sql, true, _ => {})
+ compareResultsAgainstVanillaSpark(select_sql, compareResult = true, _ =>
{})
spark.sql("drop table test_tbl_6506")
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
similarity index 96%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
index 652b15fc2..9e3fa0078 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
@@ -14,33 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.hive
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.execution.AllDataTypesWithComplexType.genTestData
+import org.apache.gluten.execution.GlutenClickHouseWholeStageTransformerSuite
+import org.apache.gluten.test.AllDataTypesWithComplexType.genTestData
import org.apache.gluten.utils.UTSystemParameters
import org.apache.spark.SparkConf
import org.apache.spark.gluten.NativeWriteChecker
-import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{DecimalType, LongType, StringType,
StructField, StructType}
-
-import org.scalatest.BeforeAndAfterAll
+import org.apache.spark.sql.types._
import scala.reflect.runtime.universe.TypeTag
class GlutenClickHouseNativeWriteTableSuite
extends GlutenClickHouseWholeStageTransformerSuite
with AdaptiveSparkPlanHelper
- with SharedSparkSession
- with BeforeAndAfterAll
+ with ReCreateHiveSession
with NativeWriteChecker {
- private var _hiveSpark: SparkSession = _
-
override protected def sparkConf: SparkConf = {
var sessionTimeZone = "GMT"
if (isSparkVersionGE("3.5")) {
@@ -80,45 +74,12 @@ class GlutenClickHouseNativeWriteTableSuite
basePath + "/中文/spark-warehouse"
}
- override protected def spark: SparkSession = _hiveSpark
-
- override protected def initializeSession(): Unit = {
- if (_hiveSpark == null) {
- val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
- _hiveSpark = SparkSession
- .builder()
- .config(sparkConf)
- .enableHiveSupport()
- .config(
- "javax.jdo.option.ConnectionURL",
- s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true")
- .getOrCreate()
- }
- }
-
private val table_name_template = "hive_%s_test"
private val table_name_vanilla_template = "hive_%s_test_written_by_vanilla"
override protected def afterAll(): Unit = {
DeltaLog.clearCache()
-
- try {
- super.afterAll()
- } finally {
- try {
- if (_hiveSpark != null) {
- try {
- _hiveSpark.sessionState.catalog.reset()
- } finally {
- _hiveSpark.stop()
- _hiveSpark = null
- }
- }
- } finally {
- SparkSession.clearActiveSession()
- SparkSession.clearDefaultSession()
- }
- }
+ super.afterAll()
}
def getColumnName(s: String): String = {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala
similarity index 87%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala
index f9e831cb4..d359428d0 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala
@@ -14,12 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.gluten.execution.hive
+
+import org.apache.gluten.execution.GlutenClickHouseTPCHAbstractSuite
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession.{getActiveSession, getDefaultSession}
-import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog}
+import org.apache.spark.sql.delta.ClickhouseSnapshot
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -33,7 +35,8 @@ import java.io.File
// This suite is to make sure clickhouse commands works well even after spark
restart
class GlutenClickHouseTableAfterRestart
extends GlutenClickHouseTPCHAbstractSuite
- with AdaptiveSparkPlanHelper {
+ with AdaptiveSparkPlanHelper
+ with ReCreateHiveSession {
override protected val needCopyParquetToTablePath = true
@@ -64,56 +67,18 @@ class GlutenClickHouseTableAfterRestart
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.input_format_parquet_max_block_size",
"8192")
+ .setMaster("local[2]")
}
override protected def createTPCHNotNullTables(): Unit = {
createNotNullTPCHTablesInParquet(tablesPath)
}
- private var _hiveSpark: SparkSession = _
- override protected def spark: SparkSession = _hiveSpark
-
- override protected def initializeSession(): Unit = {
- if (_hiveSpark == null) {
- val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db_" +
current_db_num
- current_db_num += 1
-
- _hiveSpark = SparkSession
- .builder()
- .config(sparkConf)
- .enableHiveSupport()
- .config(
- "javax.jdo.option.ConnectionURL",
- s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true")
- .master("local[2]")
- .getOrCreate()
- }
- }
-
- override protected def afterAll(): Unit = {
- DeltaLog.clearCache()
-
- try {
- super.afterAll()
- } finally {
- try {
- if (_hiveSpark != null) {
- try {
- _hiveSpark.sessionState.catalog.reset()
- } finally {
- _hiveSpark.stop()
- _hiveSpark = null
- }
- }
- } finally {
- SparkSession.clearActiveSession()
- SparkSession.clearDefaultSession()
- }
- }
- }
-
var current_db_num: Int = 0
+ override protected val hiveMetaStoreDB: String =
+ metaStorePathAbsolute + "/metastore_db_" + current_db_num
+
test("test mergetree after restart") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree;
@@ -347,22 +312,22 @@ class GlutenClickHouseTableAfterRestart
SparkSession.clearDefaultSession()
}
- val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db_"
+ val metaStoreDB = metaStorePathAbsolute + "/metastore_db_"
// use metastore_db2 to avoid issue: "Another instance of Derby may have
already booted the database"
- val destDir = new File(hiveMetaStoreDB + current_db_num)
- destDir.mkdirs()
- FileUtils.copyDirectory(new File(hiveMetaStoreDB + (current_db_num - 1)),
destDir)
- _hiveSpark = null
- _hiveSpark = SparkSession
- .builder()
- .config(sparkConf)
- .enableHiveSupport()
- .config(
- "javax.jdo.option.ConnectionURL",
- s"jdbc:derby:;databaseName=$hiveMetaStoreDB$current_db_num")
- .master("local[2]")
- .getOrCreate()
current_db_num += 1
+ val destDir = new File(metaStoreDB + current_db_num)
+ destDir.mkdirs()
+ FileUtils.copyDirectory(new File(metaStoreDB + (current_db_num - 1)),
destDir)
+ updateHiveSession(
+ SparkSession
+ .builder()
+ .config(sparkConf)
+ .enableHiveSupport()
+ .config(
+ "javax.jdo.option.ConnectionURL",
+ s"jdbc:derby:;databaseName=$metaStoreDB$current_db_num")
+ .getOrCreate()
+ )
}
}
// scalastyle:off line.size.limit
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/ReCreateHiveSession.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/ReCreateHiveSession.scala
new file mode 100644
index 000000000..c251e4636
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/ReCreateHiveSession.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.test.SharedSparkSession
+
+import org.scalatest.BeforeAndAfterAll
+
+trait ReCreateHiveSession extends SharedSparkSession with BeforeAndAfterAll {
+
+ protected val hiveMetaStoreDB: String
+
+ private var _hiveSpark: SparkSession = _
+
+ override protected def spark: SparkSession = _hiveSpark
+
+ override protected def initializeSession(): Unit = {
+ if (_hiveSpark == null) {
+ _hiveSpark = SparkSession
+ .builder()
+ .config(sparkConf)
+ .enableHiveSupport()
+ .config(
+ "javax.jdo.option.ConnectionURL",
+ s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true")
+ .getOrCreate()
+ }
+ }
+
+ override protected def afterAll(): Unit = {
+ try {
+ super.afterAll()
+ } finally {
+ try {
+ if (_hiveSpark != null) {
+ try {
+ _hiveSpark.sessionState.catalog.reset()
+ } finally {
+ _hiveSpark.stop()
+ _hiveSpark = null
+ }
+ }
+ } finally {
+ SparkSession.clearActiveSession()
+ SparkSession.clearDefaultSession()
+ }
+ }
+ }
+
+ protected def updateHiveSession(newSession: SparkSession): Unit = {
+ _hiveSpark = null
+ _hiveSpark = newSession
+ }
+}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala
index 1e6509c00..0a8d1729c 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/parquet/GlutenParquetFilterSuite.scala
@@ -35,12 +35,6 @@ class GlutenParquetFilterSuite
with GlutenTPCHBase
with Logging {
- override protected val rootPath = this.getClass.getResource("/").getPath
- override protected val basePath = rootPath + "tests-working-home"
- override protected val warehouse = basePath + "/spark-warehouse"
- override protected val metaStorePathAbsolute = basePath + "/meta"
- override protected val hiveMetaStoreDB = metaStorePathAbsolute +
"/metastore_db"
-
private val tpchQueriesResourceFolder: String =
rootPath + "../../../../gluten-core/src/test/resources/tpch-queries"
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/test/AllDataTypesWithComplexType.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/test/AllDataTypesWithComplexType.scala
new file mode 100644
index 000000000..19abcbea4
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/test/AllDataTypesWithComplexType.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.test
+
+import java.sql.{Date, Timestamp}
+
+case class AllDataTypesWithComplexType(
+ string_field: String = null,
+ int_field: java.lang.Integer = null,
+ long_field: java.lang.Long = null,
+ float_field: java.lang.Float = null,
+ double_field: java.lang.Double = null,
+ short_field: java.lang.Short = null,
+ byte_field: java.lang.Byte = null,
+ boolean_field: java.lang.Boolean = null,
+ decimal_field: java.math.BigDecimal = null,
+ date_field: java.sql.Date = null,
+ timestamp_field: java.sql.Timestamp = null,
+ array: Seq[Int] = null,
+ arrayContainsNull: Seq[Option[Int]] = null,
+ map: Map[Int, Long] = null,
+ mapValueContainsNull: Map[Int, Option[Long]] = null
+)
+
+object AllDataTypesWithComplexType {
+ def genTestData(): Seq[AllDataTypesWithComplexType] = {
+ (0 to 199).map {
+ i =>
+ if (i % 100 == 1) {
+ AllDataTypesWithComplexType()
+ } else {
+ AllDataTypesWithComplexType(
+ s"$i",
+ i,
+ i.toLong,
+ i.toFloat,
+ i.toDouble,
+ i.toShort,
+ i.toByte,
+ i % 2 == 0,
+ new java.math.BigDecimal(i + ".56"),
+ Date.valueOf(new
Date(System.currentTimeMillis()).toLocalDate.plusDays(i % 10)),
+ Timestamp.valueOf(
+ new
Timestamp(System.currentTimeMillis()).toLocalDateTime.plusDays(i % 10)),
+ Seq.apply(i + 1, i + 2, i + 3),
+ Seq.apply(Option.apply(i + 1), Option.empty, Option.apply(i + 3)),
+ Map.apply((i + 1, i + 2), (i + 3, i + 4)),
+ Map.empty
+ )
+ }
+ }
+ }
+}
diff --git a/gluten-celeborn/clickhouse/pom.xml
b/gluten-celeborn/clickhouse/pom.xml
index 284a8f572..9e64e77ce 100755
--- a/gluten-celeborn/clickhouse/pom.xml
+++ b/gluten-celeborn/clickhouse/pom.xml
@@ -148,6 +148,38 @@
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-core</artifactId>
+ <version>${arrow.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${arrow.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git
a/gluten-core/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
b/gluten-core/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
index d2626ab27..3d26dd16c 100644
--- a/gluten-core/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
+++ b/gluten-core/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
@@ -20,11 +20,11 @@ import org.apache.gluten.extension.GlutenPlan
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper, QueryStageExec}
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper, AQEShuffleReadExec, QueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
/**
- * attention: if AQE is enable,This method will only be executed correctly
after the execution plan
+ * attention: if AQE is enabled,This method will only be executed correctly
after the execution plan
* is fully determined
*/
@@ -42,10 +42,14 @@ object FallbackUtil extends Logging with
AdaptiveSparkPlanHelper {
true
case WholeStageCodegenExec(_) =>
true
+ case ColumnarInputAdapter(_) =>
+ true
case InputAdapter(_) =>
true
case AdaptiveSparkPlanExec(_, _, _, _, _) =>
true
+ case AQEShuffleReadExec(_, _) =>
+ true
case _: LimitExec =>
true
// for ut
@@ -57,30 +61,15 @@ object FallbackUtil extends Logging with
AdaptiveSparkPlanHelper {
true
case _: ReusedExchangeExec =>
true
- case p: SparkPlan if p.supportsColumnar =>
- true
case _ =>
false
}
}
def hasFallback(plan: SparkPlan): Boolean = {
- var fallbackOperator: Seq[SparkPlan] = null
- if (plan.isInstanceOf[AdaptiveSparkPlanExec]) {
- fallbackOperator = collectWithSubqueries(plan) {
- case plan if !plan.isInstanceOf[GlutenPlan] && !skip(plan) =>
- plan
- }
- } else {
- fallbackOperator = plan.collectWithSubqueries {
- case plan if !plan.isInstanceOf[GlutenPlan] && !skip(plan) =>
- plan
- }
- }
-
- if (fallbackOperator.nonEmpty) {
- fallbackOperator.foreach(operator => log.info(s"gluten fallback
operator:{$operator}"))
- }
+ val fallbackOperator = collectWithSubqueries(plan) { case plan => plan
}.filterNot(
+ plan => plan.isInstanceOf[GlutenPlan] || skip(plan))
+ fallbackOperator.foreach(operator => log.info(s"gluten fallback
operator:{$operator}"))
fallbackOperator.nonEmpty
}
}
diff --git a/gluten-ut/pom.xml b/gluten-ut/pom.xml
index 90644b832..a016eccae 100644
--- a/gluten-ut/pom.xml
+++ b/gluten-ut/pom.xml
@@ -31,7 +31,7 @@
<artifactId>gluten-ut</artifactId>
<packaging>pom</packaging>
- <name>Gluten Unit Test</name>
+ <name>Gluten Unit Test Parent</name>
<dependencies>
<!-- put gluten-core at first so that overwritten spark classes (such as
BatchScanExec,
diff --git a/gluten-ut/spark35/pom.xml b/gluten-ut/spark35/pom.xml
index f86db23e2..1750a5e27 100644
--- a/gluten-ut/spark35/pom.xml
+++ b/gluten-ut/spark35/pom.xml
@@ -36,11 +36,6 @@
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- <version>2.15.1</version>
- </dependency>
</dependencies>
<profiles>
@@ -62,6 +57,38 @@
<version>${celeborn.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-core</artifactId>
+ <version>${arrow.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${arrow.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/gluten-ut/test/pom.xml b/gluten-ut/test/pom.xml
index cdac91d6f..fb637d548 100644
--- a/gluten-ut/test/pom.xml
+++ b/gluten-ut/test/pom.xml
@@ -29,13 +29,6 @@
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
- <dependency>
- <groupId>org.apache.gluten</groupId>
- <artifactId>gluten-core</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
</dependencies>
<profiles>
@@ -57,6 +50,38 @@
<version>${celeborn.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-core</artifactId>
+ <version>${arrow.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${arrow.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</profile>
<profile>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]