This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new eaf6548e3b [GLUTEN-7028][CH][Part-15] [MINOR] Fix UTs (#8364)
eaf6548e3b is described below

commit eaf6548e3b31988048450d564576fbff6656a05b
Author: Chang chen <[email protected]>
AuthorDate: Mon Dec 30 11:41:07 2024 +0800

    [GLUTEN-7028][CH][Part-15] [MINOR] Fix UTs (#8364)
    
    * [Fix UT] spark.databricks.delta.stats.skipping -> false
    
    * [Fix UT] Bucket table not support
    
    * [Fix UT] 'test cache mergetree data no partition columns' already fixed 
by (#8346)
    
    * [UT] open ignore test
    
    * [MINOR REFACTOR] Pass by const reference instead of pass by value
    
    * [MINOR REFACTOR] validatedPartitionID
    
    * [Fix Bug] decode part name
    
    * clang 19 fix
---
 .../delta/ClickhouseOptimisticTransaction.scala    |  25 ++-
 .../sql/execution/FileDeltaColumnarWrite.scala     |  10 +-
 .../execution/MergeTreeDeltaColumnarWrite.scala    |  10 +-
 .../spark/sql/execution/CHColumnarWrite.scala      |  14 +-
 .../GlutenClickHouseMergeTreeCacheDataSuite.scala  |   2 +-
 ...tenClickHouseMergeTreePathBasedWriteSuite.scala | 185 +++++++++++----------
 ...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala |   2 +-
 ...eMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala |   2 +-
 .../GlutenClickHouseMergeTreeWriteOnS3Suite.scala  |   2 +-
 .../GlutenClickHouseMergeTreeWriteSuite.scala      |   2 +-
 .../SparkFunctionDecimalBinaryArithmetic.cpp       |   4 +-
 .../Storages/MergeTree/SparkMergeTreeMeta.cpp      |   4 +-
 .../Storages/MergeTree/StorageMergeTreeFactory.cpp |   2 +-
 .../Storages/MergeTree/StorageMergeTreeFactory.h   |   2 +-
 .../Storages/SubstraitSource/ReadBufferBuilder.cpp |   4 +-
 .../Storages/SubstraitSource/ReadBufferBuilder.h   |   2 +-
 16 files changed, 146 insertions(+), 126 deletions(-)

diff --git 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 6c7b2ab876..a4411e83f2 100644
--- 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++ 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -340,20 +340,19 @@ class ClickhouseOptimisticTransaction(
 
     var resultFiles =
       (if (optionalStatsTracker.isDefined) {
-         committer.addedStatuses.map {
-           a =>
-             a.copy(stats =
-               
optionalStatsTracker.map(_.recordedStats(a.toPath.getName)).getOrElse(a.stats))
-         }
-       } else {
-         committer.addedStatuses
-       })
+        committer.addedStatuses.map { a =>
+          a.copy(stats = optionalStatsTracker.map(
+            _.recordedStats(a.toPath.getName)).getOrElse(a.stats))
+        }
+      }
+      else {
+        committer.addedStatuses
+      })
         .filter {
-          // In some cases, we can write out an empty `inputData`. Some 
examples of this (though,
-          // they may be fixed in the future) are the MERGE command when you 
delete with empty
-          // source, or empty target, or on disjoint tables. This is hard to 
catch before
-          // the write without collecting the DF ahead of time. Instead,
-          // we can return only the AddFiles that
+          // In some cases, we can write out an empty `inputData`. Some 
examples of this (though, they
+          // may be fixed in the future) are the MERGE command when you delete 
with empty source, or
+          // empty target, or on disjoint tables. This is hard to catch before 
the write without
+          // collecting the DF ahead of time. Instead, we can return only the 
AddFiles that
           // a) actually add rows, or
           // b) don't have any stats so we don't know the number of rows at all
           case a: AddFile => a.numLogicalRecords.forall(_ > 0)
diff --git 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala
 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala
index b02eed88e6..4d6040bae8 100644
--- 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala
+++ 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
 import org.apache.gluten.backendsapi.clickhouse.RuntimeSettings
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.vectorized.NativeExpressionEvaluator
-
+import org.apache.hadoop.fs.Path
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.catalyst.InternalRow
@@ -37,11 +37,11 @@ case class DeltaFileCommitInfo(committer: 
FileDelayedCommitProtocol)
   val addedFiles: ArrayBuffer[(Map[String, String], String)] =
     new ArrayBuffer[(Map[String, String], String)]
   override def apply(stat: NativeFileWriteResult): Unit = {
-    if (stat.partition_id == CHColumnarWrite.EMPTY_PARTITION_ID) {
-      addedFiles.append((Map.empty[String, String], stat.filename))
-    } else {
+    if (CHColumnarWrite.validatedPartitionID(stat.partition_id)) {
       val partitionValues = committer.parsePartitions(stat.partition_id)
-      addedFiles.append((partitionValues, stat.relativePath))
+      addedFiles.append((partitionValues, new 
Path(stat.relativePath).toUri.toString))
+    } else {
+      addedFiles.append((Map.empty[String, String], stat.filename))
     }
   }
 
diff --git 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/MergeTreeDeltaColumnarWrite.scala
 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/MergeTreeDeltaColumnarWrite.scala
index df33fde66c..d88b6d90b7 100644
--- 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/MergeTreeDeltaColumnarWrite.scala
+++ 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/MergeTreeDeltaColumnarWrite.scala
@@ -52,10 +52,10 @@ case class MergeTreeWriteResult(
       path: Path,
       modificationTime: Long,
       hostName: Seq[String]): FileAction = {
-    val (partitionValues, part_path) = if (partition_id == 
CHColumnarWrite.EMPTY_PARTITION_ID) {
-      (Map.empty[String, String], part_name)
-    } else {
+    val (partitionValues, part_path) = if 
(CHColumnarWrite.validatedPartitionID(partition_id)) {
       (MergeTreePartitionUtils.parsePartitions(partition_id), 
s"$partition_id/$part_name")
+    } else {
+      (Map.empty[String, String], part_name)
     }
     val tags = Map[String, String](
       "database" -> database,
@@ -88,7 +88,7 @@ case class MergeTreeWriteResult(
       DeltaStatistics.NULL_COUNT -> ""
     )
     AddFile(
-      part_path,
+      new Path(part_path).toUri.toString,
       partitionValues,
       size_in_bytes,
       modificationTime,
@@ -153,7 +153,7 @@ case class MergeTreeBasicWriteTaskStatsTracker() extends 
(MergeTreeWriteResult =
   private var numFiles: Int = 0
 
   def apply(stat: MergeTreeWriteResult): Unit = {
-    if (stat.partition_id != CHColumnarWrite.EMPTY_PARTITION_ID) {
+    if (CHColumnarWrite.validatedPartitionID(stat.partition_id)) {
       partitions.append(new GenericInternalRow(Array[Any](stat.partition_id)))
     }
     numFiles += 1
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala
index 87fc50ef3a..e453fc46a3 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala
@@ -158,10 +158,10 @@ case class HadoopMapReduceAdapter(sparkCommitter: 
HadoopMapReduceCommitProtocol)
 }
 
 case class NativeFileWriteResult(filename: String, partition_id: String, 
record_count: Long) {
-  lazy val relativePath: String = if (partition_id == 
CHColumnarWrite.EMPTY_PARTITION_ID) {
-    filename
-  } else {
+  lazy val relativePath: String = if 
(CHColumnarWrite.validatedPartitionID(partition_id)) {
     s"$partition_id/$filename"
+  } else {
+    filename
   }
 }
 
@@ -212,7 +212,7 @@ case class NativeBasicWriteTaskStatsTracker(
   private var numWrittenRows: Long = 0
   override def apply(stat: NativeFileWriteResult): Unit = {
     val absolutePath = s"$writeDir/${stat.relativePath}"
-    if (stat.partition_id != CHColumnarWrite.EMPTY_PARTITION_ID) {
+    if (CHColumnarWrite.validatedPartitionID(stat.partition_id)) {
       basicWriteJobStatsTracker.newPartition(new 
GenericInternalRow(Array[Any](stat.partition_id)))
     }
     basicWriteJobStatsTracker.newFile(absolutePath)
@@ -233,7 +233,7 @@ case class FileCommitInfo(description: WriteJobDescription)
 
   def apply(stat: NativeFileWriteResult): Unit = {
     val tmpAbsolutePath = s"${description.path}/${stat.relativePath}"
-    if (stat.partition_id != CHColumnarWrite.EMPTY_PARTITION_ID) {
+    if (CHColumnarWrite.validatedPartitionID(stat.partition_id)) {
       partitions += stat.partition_id
       val customOutputPath =
         description.customPartitionLocations.get(
@@ -325,5 +325,7 @@ object CHColumnarWrite {
     case other => CHDeltaColumnarWrite(jobTrackerID, description, other)
   }
 
-  val EMPTY_PARTITION_ID = "__NO_PARTITION_ID__"
+  private val EMPTY_PARTITION_ID = "__NO_PARTITION_ID__"
+
+  def validatedPartitionID(partitionID: String): Boolean = partitionID != 
EMPTY_PARTITION_ID
 }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
index ffb1fa4012..e8cefca818 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
@@ -401,7 +401,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
     spark.sql("drop table lineitem_mergetree_hdfs purge")
   }
 
-  testSparkVersionLE33("test cache mergetree data no partition columns") {
+  test("test cache mergetree data no partition columns") {
 
     spark.sql(s"""
                  |DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
index 923def03e9..d09f48b59b 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
@@ -302,7 +302,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
     }
   }
 
-  testSparkVersionLE33("test mergetree path based table update") {
+  test("test mergetree path based table update") {
     val dataPath = s"$basePath/lineitem_mergetree_update"
     clearDataPath(dataPath)
 
@@ -315,78 +315,87 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
       .mode(SaveMode.Append)
       .save(dataPath)
 
-    spark.sql(s"""
-                 | update clickhouse.`$dataPath` set l_returnflag = 'Z' where 
l_orderkey = 12647
-                 |""".stripMargin)
-
-    {
-      val df = spark.read
-        .format("clickhouse")
-        .load(dataPath)
-        .where("l_returnflag = 'Z'")
-      assertResult(1)(df.count())
-      val scanExec = collect(df.queryExecution.executedPlan) {
-        case f: FileSourceScanExecTransformer => f
-      }
-      assertResult(1)(scanExec.size)
+    /**
+     * TODO: new test for (spark.databricks.delta.stats.skipping -> true)
+     *
+     * Since one pipeline write will collect stats, so that pruning will be 
more accurate in point
+     * query. Let's add a new test when we implement lightweight update and 
delete.
+     */
+    withSQLConf(("spark.databricks.delta.stats.skipping", "false")) {
+      spark.sql(s"""
+                   | update clickhouse.`$dataPath` set l_returnflag = 'Z' 
where l_orderkey = 12647
+                   |""".stripMargin)
+
+      {
+        val df = spark.read
+          .format("clickhouse")
+          .load(dataPath)
+          .where("l_returnflag = 'Z'")
+        assertResult(1)(df.count())
+        val scanExec = collect(df.queryExecution.executedPlan) {
+          case f: FileSourceScanExecTransformer => f
+        }
+        assertResult(1)(scanExec.size)
 
-      val mergetreeScan = scanExec.head
-      assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree"))
+        val mergetreeScan = scanExec.head
+        assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree"))
 
-      val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
-      
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
-      
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
-      assert(
-        ClickHouseTableV2
-          .getTable(fileIndex.deltaLog)
-          .orderByKey === StorageMeta.DEFAULT_ORDER_BY_KEY)
-      assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKey.isEmpty)
-      
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
-      val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
-      assertResult(600572)(addFiles.map(_.rows).sum)
-      // 4 parts belong to the first batch
-      // 2 parts belong to the second batch (1 actual updated part, 1 
passively updated).
-      assertResult(6)(addFiles.size)
-      val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, 
name.lastIndexOf("_")))
-      assertResult(2)(filePaths.size)
-      assertResult(Array(2, 4))(filePaths.values.map(paths => 
paths.size).toArray.sorted)
-    }
+        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+        assert(
+          ClickHouseTableV2
+            .getTable(fileIndex.deltaLog)
+            .orderByKey === StorageMeta.DEFAULT_ORDER_BY_KEY)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKey.isEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
+        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+        assertResult(600572)(addFiles.map(_.rows).sum)
+        // 4 parts belong to the first batch
+        // 2 parts belong to the second batch (1 actual updated part, 1 
passively updated).
+        assertResult(6)(addFiles.size)
+        val filePaths =
+          addFiles.map(_.path).groupBy(name => name.substring(0, 
name.lastIndexOf("_")))
+        assertResult(2)(filePaths.size)
+        assertResult(Array(2, 4))(filePaths.values.map(paths => 
paths.size).toArray.sorted)
+      }
 
-    val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
-    clickhouseTable.updateExpr("l_orderkey = 10086", Map("l_returnflag" -> 
"'X'"))
+      val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
+      clickhouseTable.updateExpr("l_orderkey = 10086", Map("l_returnflag" -> 
"'X'"))
 
-    {
-      val df = spark.read
-        .format("clickhouse")
-        .load(dataPath)
-        .where("l_returnflag = 'X'")
-      assertResult(1)(df.count())
-      val scanExec = collect(df.queryExecution.executedPlan) {
-        case f: FileSourceScanExecTransformer => f
-      }
-      assertResult(1)(scanExec.size)
+      {
+        val df = spark.read
+          .format("clickhouse")
+          .load(dataPath)
+          .where("l_returnflag = 'X'")
+        assertResult(1)(df.count())
+        val scanExec = collect(df.queryExecution.executedPlan) {
+          case f: FileSourceScanExecTransformer => f
+        }
+        assertResult(1)(scanExec.size)
 
-      val mergetreeScan = scanExec.head
-      assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree"))
+        val mergetreeScan = scanExec.head
+        assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree"))
 
-      val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
-      val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
-      assertResult(600572)(addFiles.map(_.rows).sum)
+        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+        assertResult(600572)(addFiles.map(_.rows).sum)
 
-      // 4 parts belong to the first batch
-      // 2 parts belong to the second batch (1 actual updated part, 1 
passively updated).
-      assertResult(6)(addFiles.size)
-      val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, 
name.lastIndexOf("_")))
-      assertResult(2)(filePaths.size)
+        // 4 parts belong to the first batch
+        // 2 parts belong to the second batch (1 actual updated part, 1 
passively updated).
+        assertResult(6)(addFiles.size)
+        val filePaths =
+          addFiles.map(_.path).groupBy(name => name.substring(0, 
name.lastIndexOf("_")))
+        assertResult(2)(filePaths.size)
+      }
     }
-
     val df = spark.read
       .format("clickhouse")
       .load(dataPath)
     assertResult(600572)(df.count())
   }
 
-  testSparkVersionLE33("test mergetree path based table delete") {
+  test("test mergetree path based table delete") {
     val dataPath = s"$basePath/lineitem_mergetree_delete"
     clearDataPath(dataPath)
 
@@ -399,32 +408,40 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
       .mode(SaveMode.Append)
       .save(dataPath)
 
-    spark.sql(s"""
-                 | delete from clickhouse.`$dataPath` where l_orderkey = 12647
-                 |""".stripMargin)
-    val df = spark.read
-      .format("clickhouse")
-      .load(dataPath)
-    assertResult(600571)(df.count())
-    val scanExec = collect(df.queryExecution.executedPlan) {
-      case f: FileSourceScanExecTransformer => f
+    /**
+     * TODO: new test for (spark.databricks.delta.stats.skipping -> true)
+     *
+     * Since one pipeline write will collect stats, so that pruning will be 
more accurate in point
+     * query. Let's add a new test when we implement lightweight update and 
delete.
+     */
+    withSQLConf(("spark.databricks.delta.stats.skipping", "false")) {
+      spark.sql(s"""
+                   | delete from clickhouse.`$dataPath` where l_orderkey = 
12647
+                   |""".stripMargin)
+      val df = spark.read
+        .format("clickhouse")
+        .load(dataPath)
+      assertResult(600571)(df.count())
+      val scanExec = collect(df.queryExecution.executedPlan) {
+        case f: FileSourceScanExecTransformer => f
+      }
+      val mergetreeScan = scanExec.head
+      val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+      val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+      // 4 parts belong to the first batch
+      // 2 parts belong to the second batch (1 actual updated part, 1 
passively updated).
+      assertResult(6)(addFiles.size)
+      val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, 
name.lastIndexOf("_")))
+      assertResult(2)(filePaths.size)
+      assertResult(Array(2, 4))(filePaths.values.map(paths => 
paths.size).toArray.sorted)
+
+      val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
+      clickhouseTable.delete("mod(l_orderkey, 3) = 2")
+      val df1 = spark.read
+        .format("clickhouse")
+        .load(dataPath)
+      assertResult(400089)(df1.count())
     }
-    val mergetreeScan = scanExec.head
-    val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
-    val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
-    // 4 parts belong to the first batch
-    // 2 parts belong to the second batch (1 actual updated part, 1 passively 
updated).
-    assertResult(6)(addFiles.size)
-    val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, 
name.lastIndexOf("_")))
-    assertResult(2)(filePaths.size)
-    assertResult(Array(2, 4))(filePaths.values.map(paths => 
paths.size).toArray.sorted)
-
-    val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
-    clickhouseTable.delete("mod(l_orderkey, 3) = 2")
-    val df1 = spark.read
-      .format("clickhouse")
-      .load(dataPath)
-    assertResult(400089)(df1.count())
   }
 
   test("test mergetree path based table upsert") {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index 91a22c4f34..7bac0d6dcf 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -436,7 +436,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
     spark.sql("drop table lineitem_mergetree_bucket_hdfs")
   }
 
-  testSparkVersionLE33("test mergetree write with the path based") {
+  testSparkVersionLE33("test mergetree write with the path based bucket 
table") {
     val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"
 
     val sourceDF = spark.sql(s"""
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
index d4a5ddd3a8..980e696255 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
@@ -435,7 +435,7 @@ class 
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite
     spark.sql("drop table lineitem_mergetree_bucket_hdfs purge")
   }
 
-  testSparkVersionLE33("test mergetree write with the path based") {
+  testSparkVersionLE33("test mergetree write with the path based bucket 
table") {
     val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"
 
     val sourceDF = spark.sql(s"""
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index 4846852256..74046e4ed4 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -496,7 +496,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
     spark.sql("drop table lineitem_mergetree_bucket_s3")
   }
 
-  testSparkVersionLE33("test mergetree write with the path based") {
+  testSparkVersionLE33("test mergetree write with the path based bucket 
table") {
     val dataPath = s"s3a://$BUCKET_NAME/lineitem_mergetree_bucket_s3"
 
     val sourceDF = spark.sql(s"""
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
index 52961a218e..d92cb15e75 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -1826,7 +1826,7 @@ class GlutenClickHouseMergeTreeWriteSuite
     runTPCHQueryBySQL(6, q6("lineitem_mergetree_case_sensitive")) { _ => }
   }
 
-  testSparkVersionLE33("test mergetree with partition with whitespace") {
+  test("test mergetree with partition with whitespace") {
     spark.sql(s"""
                  |DROP TABLE IF EXISTS 
lineitem_mergetree_partition_with_whitespace;
                  |""".stripMargin)
diff --git 
a/cpp-ch/local-engine/Functions/SparkFunctionDecimalBinaryArithmetic.cpp 
b/cpp-ch/local-engine/Functions/SparkFunctionDecimalBinaryArithmetic.cpp
index 808aba02a9..52f0f1d024 100644
--- a/cpp-ch/local-engine/Functions/SparkFunctionDecimalBinaryArithmetic.cpp
+++ b/cpp-ch/local-engine/Functions/SparkFunctionDecimalBinaryArithmetic.cpp
@@ -356,7 +356,7 @@ private:
         auto scaled_right = scale_right > 1 ? applyScaled(right, scale_right) 
: right;
 
         ScaledNativeType c_res = 0;
-        auto success = Operation::template apply(scaled_left, scaled_right, 
c_res);
+        auto success = Operation::template apply<>(scaled_left, scaled_right, 
c_res);
         if (!success)
             return false;
 
@@ -459,7 +459,7 @@ public:
             right_generic,
             removeNullable(arguments[2].type).get(),
             [&](const auto & left, const auto & right, const auto & result) {
-                return (res = SparkDecimalBinaryOperation<Operation, 
mode>::template executeDecimal(arguments, left, right, result))
+                return (res = SparkDecimalBinaryOperation<Operation, 
mode>::template executeDecimal<>(arguments, left, right, result))
                     != nullptr;
             });
 
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp 
b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp
index 372396598f..cbac37cd03 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp
@@ -211,7 +211,9 @@ MergeTreeTableInstance::MergeTreeTableInstance(const 
std::string & info) : Merge
     while (!in.eof())
     {
         MergeTreePart part;
-        readString(part.name, in);
+        std::string encoded_name;
+        readString(encoded_name, in);
+        Poco::URI::decode(encoded_name, part.name);
         assertChar('\n', in);
         readIntText(part.begin, in);
         assertChar('\n', in);
diff --git a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp 
b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp
index 8d5fe20545..3ab7cc0e67 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp
@@ -78,7 +78,7 @@ SparkStorageMergeTreePtr StorageMergeTreeFactory::getStorage(
 }
 
 DataPartsVector
-StorageMergeTreeFactory::getDataPartsByNames(const StorageID & id, const 
String & snapshot_id, std::unordered_set<String> part_name)
+StorageMergeTreeFactory::getDataPartsByNames(const StorageID & id, const 
String & snapshot_id, const std::unordered_set<String> & part_name)
 {
     DataPartsVector res;
     auto table_name = getTableName(id, snapshot_id);
diff --git a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.h 
b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.h
index cc22563053..a853ee2f2a 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.h
+++ b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.h
@@ -67,7 +67,7 @@ public:
     static SparkStorageMergeTreePtr
     getStorage(const DB::StorageID& id, const String & snapshot_id, const 
MergeTreeTable & merge_tree_table,
         const std::function<SparkStorageMergeTreePtr()> & creator);
-    static DB::DataPartsVector getDataPartsByNames(const DB::StorageID & id, 
const String & snapshot_id, std::unordered_set<String> part_name);
+    static DB::DataPartsVector getDataPartsByNames(const DB::StorageID & id, 
const String & snapshot_id, const std::unordered_set<String> & part_name);
     static void init_cache_map()
     {
         auto config = 
MergeTreeConfig::loadFromContext(QueryContext::globalContext());
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index 6b2785407d..732518ab77 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -219,7 +219,7 @@ adjustReadRangeIfNeeded(std::unique_ptr<SeekableReadBuffer> 
read_buffer, const s
 class LocalFileReadBufferBuilder : public ReadBufferBuilder
 {
 public:
-    explicit LocalFileReadBufferBuilder(DB::ContextPtr context_) : 
ReadBufferBuilder(context_) { }
+    explicit LocalFileReadBufferBuilder(const DB::ContextPtr & context_) : 
ReadBufferBuilder(context_) { }
     ~LocalFileReadBufferBuilder() override = default;
 
     bool isRemote() const override { return false; }
@@ -692,7 +692,7 @@ ReadBufferBuilder::ReadBufferBuilder(const DB::ContextPtr & 
context_) : context(
 }
 
 std::unique_ptr<DB::ReadBuffer>
-ReadBufferBuilder::wrapWithBzip2(std::unique_ptr<DB::ReadBuffer> in, const 
substrait::ReadRel::LocalFiles::FileOrFiles & file_info)
+ReadBufferBuilder::wrapWithBzip2(std::unique_ptr<DB::ReadBuffer> in, const 
substrait::ReadRel::LocalFiles::FileOrFiles & file_info) const
 {
     /// Bzip2 compressed file is splittable and we need to adjust read range 
for each split
     auto * seekable = dynamic_cast<SeekableReadBuffer *>(in.release());
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
index 81a4c60bfc..038e864b8e 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
@@ -49,7 +49,7 @@ protected:
     using ReadBufferCreator = 
std::function<std::unique_ptr<DB::ReadBufferFromFileBase>(bool restricted_seek, 
const DB::StoredObject & object)>;
 
     std::unique_ptr<DB::ReadBuffer>
-    wrapWithBzip2(std::unique_ptr<DB::ReadBuffer> in, const 
substrait::ReadRel::LocalFiles::FileOrFiles & file_info);
+    wrapWithBzip2(std::unique_ptr<DB::ReadBuffer> in, const 
substrait::ReadRel::LocalFiles::FileOrFiles & file_info) const;
 
     ReadBufferCreator wrapWithCache(
         ReadBufferCreator read_buffer_creator,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to