This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c1f90a766 Fix bucket table create error (#7156)
c1f90a766 is described below

commit c1f90a766420bcf259af5c2a07b46c8a09b8e0c0
Author: Shuai li <[email protected]>
AuthorDate: Mon Sep 9 10:09:22 2024 +0800

    Fix bucket table create error (#7156)
---
 .../v1/clickhouse/MergeTreeFileFormatWriter.scala  |  9 ++++----
 .../hive/GlutenClickHouseHiveTableSuite.scala      | 24 ++++++++++++++++++++++
 2 files changed, 29 insertions(+), 4 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala
index 26bc4edbd..f1489b86b 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala
@@ -165,13 +165,14 @@ object MergeTreeFileFormatWriter extends Logging {
       if (writerBucketSpec.isDefined) {
         // We need to add the bucket id expression to the output of the sort 
plan,
         // so that we can use backend to calculate the bucket id for each row.
-        wrapped = ProjectExec(
-          wrapped.output :+ Alias(writerBucketSpec.get.bucketIdExpression, 
"__bucket_value__")(),
-          wrapped)
+        val bucketValueExpr = bindReferences(
+          Seq(writerBucketSpec.get.bucketIdExpression),
+          finalOutputSpec.outputColumns)
+        wrapped =
+          ProjectExec(wrapped.output :+ Alias(bucketValueExpr.head, 
"__bucket_value__")(), wrapped)
         // TODO: to optimize, bucket value is computed twice here
       }
 
-      val nativeFormat = 
sparkSession.sparkContext.getLocalProperty("nativeFormat")
       
(GlutenMergeTreeWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped),
 None)
     }
 
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
index 4018eee6b..b16ae3c11 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala
@@ -1291,6 +1291,30 @@ class GlutenClickHouseHiveTableSuite
       .mode(SaveMode.Overwrite)
       .save(dataPath3)
     assert(new File(dataPath3).listFiles().nonEmpty)
+
+    val dataPath4 = s"$basePath/lineitem_mergetree_bucket2"
+    val df4 = spark
+      .sql(s"""
+              |select
+              |  INT_FIELD ,
+              |  STRING_FIELD,
+              |  LONG_FIELD ,
+              |  DATE_FIELD
+              | from $txt_table_name
+              | order by INT_FIELD
+              |""".stripMargin)
+      .toDF("INT_FIELD", "STRING_FIELD", "LONG_FIELD", "DATE_FIELD")
+
+    df4.write
+      .format("clickhouse")
+      .partitionBy("DATE_FIELD")
+      .option("clickhouse.numBuckets", "3")
+      .option("clickhouse.bucketColumnNames", "STRING_FIELD")
+      .option("clickhouse.orderByKey", "INT_FIELD,LONG_FIELD")
+      .option("clickhouse.primaryKey", "INT_FIELD")
+      .mode(SaveMode.Append)
+      .save(dataPath4)
+    assert(new File(dataPath4).listFiles().nonEmpty)
   }
 
   test("GLUTEN-6506: Orc read time zone") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to