This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c1f90a766 Fix bucket table create error (#7156)
c1f90a766 is described below
commit c1f90a766420bcf259af5c2a07b46c8a09b8e0c0
Author: Shuai li <[email protected]>
AuthorDate: Mon Sep 9 10:09:22 2024 +0800
Fix bucket table create error (#7156)
---
.../v1/clickhouse/MergeTreeFileFormatWriter.scala | 9 ++++----
.../hive/GlutenClickHouseHiveTableSuite.scala | 24 ++++++++++++++++++++++
2 files changed, 29 insertions(+), 4 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala
index 26bc4edbd..f1489b86b 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala
@@ -165,13 +165,14 @@ object MergeTreeFileFormatWriter extends Logging {
if (writerBucketSpec.isDefined) {
// We need to add the bucket id expression to the output of the sort
plan,
// so that we can use backend to calculate the bucket id for each row.
- wrapped = ProjectExec(
- wrapped.output :+ Alias(writerBucketSpec.get.bucketIdExpression,
"__bucket_value__")(),
- wrapped)
+ val bucketValueExpr = bindReferences(
+ Seq(writerBucketSpec.get.bucketIdExpression),
+ finalOutputSpec.outputColumns)
+ wrapped =
+ ProjectExec(wrapped.output :+ Alias(bucketValueExpr.head,
"__bucket_value__")(), wrapped)
// TODO: to optimize, bucket value is computed twice here
}
- val nativeFormat =
sparkSession.sparkContext.getLocalProperty("nativeFormat")
(GlutenMergeTreeWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped),
None)
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
index 4018eee6b..b16ae3c11 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
@@ -1291,6 +1291,30 @@ class GlutenClickHouseHiveTableSuite
.mode(SaveMode.Overwrite)
.save(dataPath3)
assert(new File(dataPath3).listFiles().nonEmpty)
+
+ val dataPath4 = s"$basePath/lineitem_mergetree_bucket2"
+ val df4 = spark
+ .sql(s"""
+ |select
+ | INT_FIELD ,
+ | STRING_FIELD,
+ | LONG_FIELD ,
+ | DATE_FIELD
+ | from $txt_table_name
+ | order by INT_FIELD
+ |""".stripMargin)
+ .toDF("INT_FIELD", "STRING_FIELD", "LONG_FIELD", "DATE_FIELD")
+
+ df4.write
+ .format("clickhouse")
+ .partitionBy("DATE_FIELD")
+ .option("clickhouse.numBuckets", "3")
+ .option("clickhouse.bucketColumnNames", "STRING_FIELD")
+ .option("clickhouse.orderByKey", "INT_FIELD,LONG_FIELD")
+ .option("clickhouse.primaryKey", "INT_FIELD")
+ .mode(SaveMode.Append)
+ .save(dataPath4)
+ assert(new File(dataPath4).listFiles().nonEmpty)
}
test("GLUTEN-6506: Orc read time zone") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]