This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 3d1d79cc3 [GLUTEN-7344][CH] Fix the error default database name and
table name for the mergetree file format when using path based (#7346)
3d1d79cc3 is described below
commit 3d1d79cc33771ed376806dd5bcfab5304e29ef5d
Author: Zhichao Zhang <[email protected]>
AuthorDate: Wed Sep 25 18:32:15 2024 +0800
[GLUTEN-7344][CH] Fix the error default database name and table name for
the mergetree file format when using path based (#7346)
What changes were proposed in this pull request?
Fix the error default database name and table name for the mergetree file
format when using path based
Close #7344.
(Fixes: #7344)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise,
remove this)
---
.../sql/delta/catalog/ClickHouseTableV2Base.scala | 4 +-
.../utils/MergeTreePartsPartitionsUtil.scala | 2 +-
...tenClickHouseMergeTreePathBasedWriteSuite.scala | 59 ++++++++++++++++++++++
3 files changed, 63 insertions(+), 2 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
index b1d9760d7..4d01c3798 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
@@ -29,6 +29,8 @@ import java.{util => ju}
trait ClickHouseTableV2Base {
+ val DEFAULT_DATABASE = "clickhouse_db"
+
def deltaProperties(): ju.Map[String, String]
def deltaCatalog(): Option[CatalogTable]
@@ -39,7 +41,7 @@ trait ClickHouseTableV2Base {
lazy val dataBaseName = deltaCatalog
.map(_.identifier.database.getOrElse("default"))
- .getOrElse("clickhouse")
+ .getOrElse(DEFAULT_DATABASE)
lazy val tableName = deltaCatalog
.map(_.identifier.table)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
index e300533bd..b2ca931c0 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
@@ -83,7 +83,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
(table.catalogTable.get.identifier.database.get,
table.catalogTable.get.identifier.table)
} else {
// for file_format.`file_path`
- ("default", "file_format")
+ (table.DEFAULT_DATABASE, table.deltaPath.toUri.getPath)
}
val engine = "MergeTree"
val relativeTablePath =
fileIndex.deltaLog.dataPath.toUri.getPath.substring(1)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
index 0a8860aaa..a1dd5d868 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
@@ -1337,4 +1337,63 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
assert(df.queryExecution.executedPlan.isInstanceOf[LocalTableScanExec])
}
}
+
+ test(
+ "GLUTEN-7344: Fix the error default database name and table " +
+ "name for the mergetree file format when using path based") {
+ val dataPath = s"$basePath/lineitem_filebased_7344"
+ clearDataPath(dataPath)
+ val dataPath1 = s"$basePath/lineitem_filebased_7344_1"
+ clearDataPath(dataPath1)
+
+ val sourceDF = spark.sql(s"""
+ |select * from lineitem
+ |""".stripMargin)
+
+ sourceDF.write
+ .format("clickhouse")
+ .mode(SaveMode.Append)
+ .option("clickhouse.orderByKey", "l_shipdate,l_orderkey")
+ .option("clickhouse.primaryKey", "l_shipdate")
+ .option("clickhouse.lowCardKey", "l_returnflag,l_linestatus")
+ .save(dataPath)
+
+ sourceDF.write
+ .format("clickhouse")
+ .mode(SaveMode.Append)
+ .option("clickhouse.orderByKey", "l_shipdate,l_orderkey")
+ .option("clickhouse.primaryKey", "l_shipdate")
+ .option("clickhouse.lowCardKey", "l_returnflag,l_linestatus")
+ .save(dataPath1)
+
+ val df = spark.read
+ .format("clickhouse")
+ .load(dataPath)
+ val result = df.collect()
+ assertResult(600572)(result.size)
+
+ val plans = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ val partitions = plans(0).getPartitions
+ assert(partitions.nonEmpty)
+ assert(partitions(0).isInstanceOf[GlutenMergeTreePartition])
+
assert(partitions(0).asInstanceOf[GlutenMergeTreePartition].database.equals("clickhouse_db"))
+
assert(partitions(0).asInstanceOf[GlutenMergeTreePartition].table.equals(dataPath))
+
+ val df1 = spark.read
+ .format("clickhouse")
+ .load(dataPath1)
+ val result1 = df1.collect()
+ assertResult(600572)(result.size)
+
+ val plans1 = collect(df1.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ val partitions1 = plans1(0).getPartitions
+ assert(partitions1.nonEmpty)
+ assert(partitions1(0).isInstanceOf[GlutenMergeTreePartition])
+
assert(partitions1(0).asInstanceOf[GlutenMergeTreePartition].database.equals("clickhouse_db"))
+
assert(partitions1(0).asInstanceOf[GlutenMergeTreePartition].table.equals(dataPath1))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]