This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 79e1d588d [GLUTEN-1632][CH]Daily Update Clickhouse Version (20240620) 
(#6150)
79e1d588d is described below

commit 79e1d588dcad5a655c2e5d363d18f8d695ef1cee
Author: Kyligence Git <[email protected]>
AuthorDate: Thu Jun 20 03:04:25 2024 -0500

    [GLUTEN-1632][CH]Daily Update Clickhouse Version (20240620) (#6150)
    
    * [GLUTEN-1632][CH]Daily Update Clickhouse Version (20240620)
    
    * Fix Build due to https://github.com/ClickHouse/ClickHouse/pull/61047
    
    * fix style
    
    * Using assertResult instead of assert, so we can know the actual result 
once failed.
    
    ---------
    
    Co-authored-by: kyligence-git <[email protected]>
    Co-authored-by: Chang Chen <[email protected]>
---
 ...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala | 111 +++++++++---------
 .../GlutenClickHouseMergeTreeWriteOnS3Suite.scala  | 127 ++++++++++-----------
 cpp-ch/clickhouse.version                          |   4 +-
 .../Storages/Mergetree/SparkMergeTreeWriter.cpp    |  32 +++---
 .../Storages/Mergetree/SparkMergeTreeWriter.h      |  13 ++-
 5 files changed, 141 insertions(+), 146 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index 56b8f056b..572d0cd50 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -57,6 +57,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
       .set(
         
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
         "false")
+    // .set("spark.gluten.sql.columnar.backend.ch.runtime_config.path", 
"/data") // for local test
   }
 
   override protected def beforeEach(): Unit = {
@@ -139,7 +140,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
         val scanExec = collect(df.queryExecution.executedPlan) {
           case f: FileSourceScanExecTransformer => f
         }
-        assert(scanExec.size == 1)
+        assertResult(1)(scanExec.size)
 
         val mergetreeScan = scanExec.head
         assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
@@ -151,8 +152,8 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
         val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
-        assert(addFiles.size == 1)
-        assert(addFiles.head.rows == 600572)
+        assertResult(1)(addFiles.size)
+        assertResult(600572)(addFiles.head.rows)
     }
     spark.sql("drop table lineitem_mergetree_hdfs")
   }
@@ -224,7 +225,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
         val scanExec = collect(df.queryExecution.executedPlan) {
           case f: FileSourceScanExecTransformer => f
         }
-        assert(scanExec.size == 1)
+        assertResult(1)(scanExec.size)
 
         val mergetreeScan = scanExec.head
         assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
@@ -232,24 +233,22 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
         val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
-        assert(
+        assertResult("l_shipdate,l_orderkey")(
           ClickHouseTableV2
             .getTable(fileIndex.deltaLog)
             .orderByKeyOption
             .get
-            .mkString(",")
-            .equals("l_shipdate,l_orderkey"))
-        assert(
+            .mkString(","))
+        assertResult("l_shipdate")(
           ClickHouseTableV2
             .getTable(fileIndex.deltaLog)
             .primaryKeyOption
             .get
-            .mkString(",")
-            .equals("l_shipdate"))
+            .mkString(","))
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
         val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
-        assert(addFiles.size == 1)
-        assert(addFiles.head.rows == 600572)
+        assertResult(1)(addFiles.size)
+        assertResult(600572)(addFiles.head.rows)
     }
     spark.sql("drop table lineitem_mergetree_orderbykey_hdfs")
   }
@@ -386,51 +385,49 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
     runTPCHQueryBySQL(1, sqlStr, compareResult = false) {
       df =>
         val result = df.collect()
-        assert(result.length == 4)
-        assert(result(0).getString(0).equals("A"))
-        assert(result(0).getString(1).equals("F"))
-        assert(result(0).getDouble(2) == 7578058.0)
+        assertResult(4)(result.length)
+        assertResult("A")(result(0).getString(0))
+        assertResult("F")(result(0).getString(1))
+        assertResult(7578058.0)(result(0).getDouble(2))
 
-        assert(result(2).getString(0).equals("N"))
-        assert(result(2).getString(1).equals("O"))
-        assert(result(2).getDouble(2) == 7454519.0)
+        assertResult("N")(result(2).getString(0))
+        assertResult("O")(result(2).getString(1))
+        assertResult(7454519.0)(result(2).getDouble(2))
 
         val scanExec = collect(df.queryExecution.executedPlan) {
           case f: FileSourceScanExecTransformer => f
         }
-        assert(scanExec.size == 1)
+        assertResult(1)(scanExec.size)
 
         val mergetreeScan = scanExec.head
         assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
-        assert(mergetreeScan.metrics("numFiles").value == 6)
+        assertResult(6)(mergetreeScan.metrics("numFiles").value)
 
         val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
-        assert(
+        assertResult("l_orderkey")(
           ClickHouseTableV2
             .getTable(fileIndex.deltaLog)
             .orderByKeyOption
             .get
-            .mkString(",")
-            .equals("l_orderkey"))
-        assert(
+            .mkString(","))
+        assertResult("l_orderkey")(
           ClickHouseTableV2
             .getTable(fileIndex.deltaLog)
             .primaryKeyOption
             .get
-            .mkString(",")
-            .equals("l_orderkey"))
-        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 
1)
-        assert(
+            .mkString(","))
+        
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
+        assertResult("l_returnflag")(
           ClickHouseTableV2
             .getTable(fileIndex.deltaLog)
-            .partitionColumns(0)
-            .equals("l_returnflag"))
+            .partitionColumns
+            .head)
         val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
 
-        assert(addFiles.size == 6)
-        assert(addFiles.map(_.rows).sum == 750735)
+        assertResult(6)(addFiles.size)
+        assertResult(750735)(addFiles.map(_.rows).sum)
     }
     spark.sql("drop table lineitem_mergetree_partition_hdfs")
   }
@@ -503,36 +500,35 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
         val scanExec = collect(df.queryExecution.executedPlan) {
           case f: FileSourceScanExecTransformer => f
         }
-        assert(scanExec.size == 1)
+        assertResult(1)(scanExec.size)
 
-        val mergetreeScan = scanExec(0)
+        val mergetreeScan = scanExec.head
         assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
 
         val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
-        
assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined)
         if (sparkVersion.equals("3.2")) {
           
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
         } else {
-          assert(
+          assertResult("l_partkey")(
             ClickHouseTableV2
               .getTable(fileIndex.deltaLog)
               .orderByKeyOption
               .get
-              .mkString(",")
-              .equals("l_partkey"))
+              .mkString(","))
         }
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
-        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 
1)
-        assert(
+        
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
+        assertResult("l_returnflag")(
           ClickHouseTableV2
             .getTable(fileIndex.deltaLog)
-            .partitionColumns(0)
-            .equals("l_returnflag"))
+            .partitionColumns
+            .head)
         val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
 
-        assert(addFiles.size == 12)
-        assert(addFiles.map(_.rows).sum == 600572)
+        assertResult(12)(addFiles.size)
+        assertResult(600572)(addFiles.map(_.rows).sum)
     }
     spark.sql("drop table lineitem_mergetree_bucket_hdfs")
   }
@@ -585,39 +581,38 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
         val scanExec = collect(df.queryExecution.executedPlan) {
           case f: FileSourceScanExecTransformer => f
         }
-        assert(scanExec.size == 1)
+        assertResult(1)(scanExec.size)
 
-        val mergetreeScan = scanExec(0)
+        val mergetreeScan = scanExec.head
         assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
 
         val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
-        
assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
-        assert(
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined)
+        assertResult("l_orderkey")(
           ClickHouseTableV2
             .getTable(fileIndex.deltaLog)
             .orderByKeyOption
             .get
-            .mkString(",")
-            .equals("l_orderkey"))
+            .mkString(","))
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.nonEmpty)
-        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 
1)
-        assert(
+        
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
+        assertResult("l_returnflag")(
           ClickHouseTableV2
             .getTable(fileIndex.deltaLog)
-            .partitionColumns(0)
-            .equals("l_returnflag"))
+            .partitionColumns
+            .head)
         val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
 
-        assert(addFiles.size == 12)
-        assert(addFiles.map(_.rows).sum == 600572)
+        assertResult(12)(addFiles.size)
+        assertResult(600572)(addFiles.map(_.rows).sum)
     }
 
     val result = spark.read
       .format("clickhouse")
       .load(dataPath)
       .count()
-    assert(result == 600572)
+    assertResult(600572)(result)
   }
 }
 // scalastyle:off line.size.limit
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index c5dc3a237..30f443265 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -55,6 +55,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
       .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
       .set("spark.sql.adaptive.enabled", "true")
       .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", 
"error")
+    // .set("spark.gluten.sql.columnar.backend.ch.runtime_config.path", 
"/data") // for local test
   }
 
   override protected def beforeEach(): Unit = {
@@ -152,7 +153,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
         val scanExec = collect(df.queryExecution.executedPlan) {
           case f: FileSourceScanExecTransformer => f
         }
-        assert(scanExec.size == 1)
+        assertResult(1)(scanExec.size)
 
         val mergetreeScan = scanExec.head
         assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
@@ -164,8 +165,8 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
         val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
-        assert(addFiles.size == 1)
-        assert(addFiles.head.rows == 600572)
+        assertResult(1)(addFiles.size)
+        assertResult(600572)(addFiles.head.rows)
     }
     spark.sql("drop table lineitem_mergetree_s3") // clean up
   }
@@ -237,7 +238,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
         val scanExec = collect(df.queryExecution.executedPlan) {
           case f: FileSourceScanExecTransformer => f
         }
-        assert(scanExec.size == 1)
+        assertResult(1)(scanExec.size)
 
         val mergetreeScan = scanExec.head
         assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
@@ -245,24 +246,22 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
         val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
-        assert(
+        assertResult("l_shipdate,l_orderkey")(
           ClickHouseTableV2
             .getTable(fileIndex.deltaLog)
             .orderByKeyOption
             .get
-            .mkString(",")
-            .equals("l_shipdate,l_orderkey"))
-        assert(
+            .mkString(","))
+        assertResult("l_shipdate")(
           ClickHouseTableV2
             .getTable(fileIndex.deltaLog)
             .primaryKeyOption
             .get
-            .mkString(",")
-            .equals("l_shipdate"))
+            .mkString(","))
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
         val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
-        assert(addFiles.size == 1)
-        assert(addFiles.head.rows == 600572)
+        assertResult(1)(addFiles.size)
+        assertResult(600572)(addFiles.head.rows)
     }
     spark.sql("drop table lineitem_mergetree_orderbykey_s3")
   }
@@ -399,51 +398,49 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
     runTPCHQueryBySQL(1, sqlStr, compareResult = false) {
       df =>
         val result = df.collect()
-        assert(result.length == 4)
-        assert(result(0).getString(0).equals("A"))
-        assert(result(0).getString(1).equals("F"))
-        assert(result(0).getDouble(2) == 7578058.0)
+        assertResult(4)(result.length)
+        assertResult("A")(result(0).getString(0))
+        assertResult("F")(result(0).getString(1))
+        assertResult(7578058.0)(result(0).getDouble(2))
 
-        assert(result(2).getString(0).equals("N"))
-        assert(result(2).getString(1).equals("O"))
-        assert(result(2).getDouble(2) == 7454519.0)
+        assertResult("N")(result(2).getString(0))
+        assertResult("O")(result(2).getString(1))
+        assertResult(7454519.0)(result(2).getDouble(2))
 
         val scanExec = collect(df.queryExecution.executedPlan) {
           case f: FileSourceScanExecTransformer => f
         }
-        assert(scanExec.size == 1)
+        assertResult(1)(scanExec.size)
 
         val mergetreeScan = scanExec.head
         assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
-        assert(mergetreeScan.metrics("numFiles").value == 6)
+        assertResult(6)(mergetreeScan.metrics("numFiles").value)
 
         val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
-        assert(
+        assertResult("l_orderkey")(
           ClickHouseTableV2
             .getTable(fileIndex.deltaLog)
             .orderByKeyOption
             .get
-            .mkString(",")
-            .equals("l_orderkey"))
-        assert(
+            .mkString(","))
+        assertResult("l_orderkey")(
           ClickHouseTableV2
             .getTable(fileIndex.deltaLog)
             .primaryKeyOption
             .get
-            .mkString(",")
-            .equals("l_orderkey"))
-        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 
1)
-        assert(
+            .mkString(","))
+        
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
+        assertResult("l_returnflag")(
           ClickHouseTableV2
             .getTable(fileIndex.deltaLog)
-            .partitionColumns(0)
-            .equals("l_returnflag"))
+            .partitionColumns
+            .head)
         val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
 
-        assert(addFiles.size == 6)
-        assert(addFiles.map(_.rows).sum == 750735)
+        assertResult(6)(addFiles.size)
+        assertResult(750735)(addFiles.map(_.rows).sum)
     }
     spark.sql("drop table lineitem_mergetree_partition_s3")
 
@@ -517,36 +514,35 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
         val scanExec = collect(df.queryExecution.executedPlan) {
           case f: FileSourceScanExecTransformer => f
         }
-        assert(scanExec.size == 1)
+        assertResult(1)(scanExec.size)
 
-        val mergetreeScan = scanExec(0)
+        val mergetreeScan = scanExec.head
         assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
 
         val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
-        
assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined)
         if (sparkVersion.equals("3.2")) {
           
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
         } else {
-          assert(
+          assertResult("l_partkey")(
             ClickHouseTableV2
               .getTable(fileIndex.deltaLog)
               .orderByKeyOption
               .get
-              .mkString(",")
-              .equals("l_partkey"))
+              .mkString(","))
         }
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
-        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 
1)
-        assert(
+        
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
+        assertResult("l_returnflag")(
           ClickHouseTableV2
             .getTable(fileIndex.deltaLog)
-            .partitionColumns(0)
-            .equals("l_returnflag"))
+            .partitionColumns
+            .head)
         val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
 
-        assert(addFiles.size == 12)
-        assert(addFiles.map(_.rows).sum == 600572)
+        assertResult(12)(addFiles.size)
+        assertResult(600572)(addFiles.map(_.rows).sum)
     }
     spark.sql("drop table lineitem_mergetree_bucket_s3")
   }
@@ -599,39 +595,38 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
         val scanExec = collect(df.queryExecution.executedPlan) {
           case f: FileSourceScanExecTransformer => f
         }
-        assert(scanExec.size == 1)
+        assertResult(1)(scanExec.size)
 
-        val mergetreeScan = scanExec(0)
+        val mergetreeScan = scanExec.head
         assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
 
         val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
-        
assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
-        assert(
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined)
+        assertResult("l_orderkey")(
           ClickHouseTableV2
             .getTable(fileIndex.deltaLog)
             .orderByKeyOption
             .get
-            .mkString(",")
-            .equals("l_orderkey"))
+            .mkString(","))
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.nonEmpty)
-        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 
1)
-        assert(
+        
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
+        assertResult("l_returnflag")(
           ClickHouseTableV2
             .getTable(fileIndex.deltaLog)
-            .partitionColumns(0)
-            .equals("l_returnflag"))
+            .partitionColumns
+            .head)
         val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
 
-        assert(addFiles.size == 12)
-        assert(addFiles.map(_.rows).sum == 600572)
+        assertResult(12)(addFiles.size)
+        assertResult(600572)(addFiles.map(_.rows).sum)
     }
 
     val result = spark.read
       .format("clickhouse")
       .load(dataPath)
       .count()
-    assert(result == 600572)
+    assertResult(600572)(result)
   }
 
   test("test mergetree insert with optimize basic") {
@@ -639,8 +634,8 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
     val dataPath = s"s3a://$BUCKET_NAME/$tableName"
 
     withSQLConf(
-      ("spark.databricks.delta.optimize.minFileSize" -> "200000000"),
-      
("spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert"
 -> "true")
+      "spark.databricks.delta.optimize.minFileSize" -> "200000000",
+      
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert"
 -> "true"
     ) {
       spark.sql(s"""
                    |DROP TABLE IF EXISTS $tableName;
@@ -654,7 +649,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
                    |""".stripMargin)
 
       val ret = spark.sql(s"select count(*) from $tableName").collect()
-      assert(ret.apply(0).get(0) == 600572)
+      assertResult(600572)(ret.apply(0).get(0))
       assert(
         !new 
File(s"$CH_DEFAULT_STORAGE_DIR/lineitem_mergetree_insert_optimize_basic").exists())
     }
@@ -713,22 +708,22 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
          |""".stripMargin
 
     withSQLConf(
-      
("spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index"
 -> "true")) {
+      
"spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index"
 -> "true") {
       runTPCHQueryBySQL(6, sqlStr) {
         df =>
           val scanExec = collect(df.queryExecution.executedPlan) {
             case f: FileSourceScanExecTransformer => f
           }
-          assert(scanExec.size == 1)
+          assertResult(1)(scanExec.size)
 
-          val mergetreeScan = scanExec(0)
+          val mergetreeScan = scanExec.head
           assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
 
           val plans = collect(df.queryExecution.executedPlan) {
             case scanExec: BasicScanExecTransformer => scanExec
           }
-          assert(plans.size == 1)
-          assert(plans(0).getSplitInfos.size == 1)
+          assertResult(1)(plans.size)
+          assertResult(1)(plans.head.getSplitInfos.size)
       }
     }
   }
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index 2bbb29453..1e3ac8d88 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
 CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20240616
-CH_COMMIT=803ee50cdb9fd56a5d77c710da1cbd071a74d1da
+CH_BRANCH=rebase_ch/20240620
+CH_COMMIT=f9c3886a767
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp 
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
index 259af5698..c1f2391a2 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
@@ -87,8 +87,7 @@ SparkMergeTreeWriter::SparkMergeTreeWriter(
     metadata_snapshot = storage->getInMemoryMetadataPtr();
     header = metadata_snapshot->getSampleBlock();
     const DB::Settings & settings = context->getSettingsRef();
-    squashing_transform
-        = 
std::make_unique<DB::SquashingTransform>(settings.min_insert_block_size_rows, 
settings.min_insert_block_size_bytes);
+    squashing = std::make_unique<DB::Squashing>(header, 
settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
     if (!partition_dir.empty())
         extractPartitionValues(partition_dir, partition_values);
 
@@ -105,25 +104,33 @@ SparkMergeTreeWriter::SparkMergeTreeWriter(
         merge_limit_parts = limit_cnt_field.get<Int64>() <= 0 ? 
merge_limit_parts : limit_cnt_field.get<Int64>();
 }
 
-void SparkMergeTreeWriter::write(DB::Block & block)
+void SparkMergeTreeWriter::write(const DB::Block & block)
 {
     auto new_block = removeColumnSuffix(block);
     if (auto converter = ActionsDAG::makeConvertingActions(
             new_block.getColumnsWithTypeAndName(), 
header.getColumnsWithTypeAndName(), DB::ActionsDAG::MatchColumnsMode::Position))
         ExpressionActions(converter).execute(new_block);
 
-    if (auto add_block = squashing_transform->add(new_block))
+    bool has_part = chunkToPart(squashing->add({new_block.getColumns(), 
new_block.rows()}));
+
+    if (has_part && merge_after_insert)
+        checkAndMerge();
+}
+
+bool SparkMergeTreeWriter::chunkToPart(Chunk && chunk)
+{
+    if (chunk.hasChunkInfo())
     {
-        bool has_part = blockToPart(add_block);
-        if (has_part && merge_after_insert)
-            checkAndMerge();
+        Chunk squash_chunk = DB::Squashing::squash(std::move(chunk));
+        Block result = header.cloneWithColumns(squash_chunk.getColumns());
+        return blockToPart(result);
     }
+    return false;
 }
 
 bool SparkMergeTreeWriter::blockToPart(Block & block)
 {
-    auto blocks_with_partition
-        = MergeTreeDataWriter::splitBlockIntoParts(std::move(block), 10, 
metadata_snapshot, context);
+    auto blocks_with_partition = 
MergeTreeDataWriter::splitBlockIntoParts(std::move(block), 10, 
metadata_snapshot, context);
 
     if (blocks_with_partition.empty())
         return false;
@@ -180,12 +187,7 @@ void SparkMergeTreeWriter::manualFreeMemory(size_t 
before_write_memory)
 
 void SparkMergeTreeWriter::finalize()
 {
-    if (auto block = squashing_transform->add({}))
-    {
-        if (block.rows())
-            blockToPart(block);
-    }
-
+    chunkToPart(squashing->flush());
     if (merge_after_insert)
         finalizeMerge();
 
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h 
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
index 5c4b66403..2b07521ed 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
@@ -17,7 +17,7 @@
 #pragma once
 
 #include <Interpreters/Context.h>
-#include <Interpreters/SquashingTransform.h>
+#include <Interpreters/Squashing.h>
 #include <Storages/MergeTree/IMergeTreeDataPart.h>
 #include <Storages/MergeTree/MergeTreeDataWriter.h>
 #include <Storages/StorageMergeTreeFactory.h>
@@ -59,13 +59,15 @@ public:
         const String & partition_dir_ = "",
         const String & bucket_dir_ = "");
 
-    void write(DB::Block & block);
+    void write(const DB::Block & block);
     void finalize();
     std::vector<PartInfo> getAllPartInfo();
 
 private:
-    void
-    writeTempPart(MergeTreeDataWriter::TemporaryPart & temp_part, 
DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & 
metadata_snapshot);
+    void writeTempPart(
+        MergeTreeDataWriter::TemporaryPart & temp_part,
+        DB::BlockWithPartition & block_with_partition,
+        const DB::StorageMetadataPtr & metadata_snapshot);
     DB::MergeTreeDataWriter::TemporaryPart
     writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition, 
const DB::StorageMetadataPtr & metadata_snapshot);
     void checkAndMerge(bool force = false);
@@ -75,6 +77,7 @@ private:
     void saveMetadata();
     void commitPartToRemoteStorageIfNeeded();
     void finalizeMerge();
+    bool chunkToPart(Chunk && chunk);
     bool blockToPart(Block & block);
 
     CustomStorageMergeTreePtr storage = nullptr;
@@ -87,7 +90,7 @@ private:
     String bucket_dir;
 
     DB::ContextPtr context;
-    std::unique_ptr<DB::SquashingTransform> squashing_transform;
+    std::unique_ptr<DB::Squashing> squashing;
     int part_num = 1;
     ConcurrentDeque<DB::MergeTreeDataPartPtr> new_parts;
     std::unordered_map<String, String> partition_values;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to