This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new eaf6548e3b [GLUTEN-7028][CH][Part-15] [MINOR] Fix UTs (#8364)
eaf6548e3b is described below
commit eaf6548e3b31988048450d564576fbff6656a05b
Author: Chang chen <[email protected]>
AuthorDate: Mon Dec 30 11:41:07 2024 +0800
[GLUTEN-7028][CH][Part-15] [MINOR] Fix UTs (#8364)
* [Fix UT] spark.databricks.delta.stats.skipping -> false
* [Fix UT] Bucket table not support
* [Fix UT] 'test cache mergetree data no partition columns' already fixed
by (#8346)
* [UT] open ignore test
* [MINOR REFACTOR] Pass by const reference instead of pass by value
* [MINOR REFACTOR] validatedPartitionID
* [Fix Bug] decode part name
* clang 19 fix
---
.../delta/ClickhouseOptimisticTransaction.scala | 25 ++-
.../sql/execution/FileDeltaColumnarWrite.scala | 10 +-
.../execution/MergeTreeDeltaColumnarWrite.scala | 10 +-
.../spark/sql/execution/CHColumnarWrite.scala | 14 +-
.../GlutenClickHouseMergeTreeCacheDataSuite.scala | 2 +-
...tenClickHouseMergeTreePathBasedWriteSuite.scala | 185 +++++++++++----------
...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala | 2 +-
...eMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala | 2 +-
.../GlutenClickHouseMergeTreeWriteOnS3Suite.scala | 2 +-
.../GlutenClickHouseMergeTreeWriteSuite.scala | 2 +-
.../SparkFunctionDecimalBinaryArithmetic.cpp | 4 +-
.../Storages/MergeTree/SparkMergeTreeMeta.cpp | 4 +-
.../Storages/MergeTree/StorageMergeTreeFactory.cpp | 2 +-
.../Storages/MergeTree/StorageMergeTreeFactory.h | 2 +-
.../Storages/SubstraitSource/ReadBufferBuilder.cpp | 4 +-
.../Storages/SubstraitSource/ReadBufferBuilder.h | 2 +-
16 files changed, 146 insertions(+), 126 deletions(-)
diff --git
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 6c7b2ab876..a4411e83f2 100644
---
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -340,20 +340,19 @@ class ClickhouseOptimisticTransaction(
var resultFiles =
(if (optionalStatsTracker.isDefined) {
- committer.addedStatuses.map {
- a =>
- a.copy(stats =
-
optionalStatsTracker.map(_.recordedStats(a.toPath.getName)).getOrElse(a.stats))
- }
- } else {
- committer.addedStatuses
- })
+ committer.addedStatuses.map { a =>
+ a.copy(stats = optionalStatsTracker.map(
+ _.recordedStats(a.toPath.getName)).getOrElse(a.stats))
+ }
+ }
+ else {
+ committer.addedStatuses
+ })
.filter {
- // In some cases, we can write out an empty `inputData`. Some
examples of this (though,
- // they may be fixed in the future) are the MERGE command when you
delete with empty
- // source, or empty target, or on disjoint tables. This is hard to
catch before
- // the write without collecting the DF ahead of time. Instead,
- // we can return only the AddFiles that
+ // In some cases, we can write out an empty `inputData`. Some
examples of this (though, they
+ // may be fixed in the future) are the MERGE command when you delete
with empty source, or
+ // empty target, or on disjoint tables. This is hard to catch before
the write without
+ // collecting the DF ahead of time. Instead, we can return only the
AddFiles that
// a) actually add rows, or
// b) don't have any stats so we don't know the number of rows at all
case a: AddFile => a.numLogicalRecords.forall(_ > 0)
diff --git
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala
index b02eed88e6..4d6040bae8 100644
---
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala
+++
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.clickhouse.RuntimeSettings
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.vectorized.NativeExpressionEvaluator
-
+import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.catalyst.InternalRow
@@ -37,11 +37,11 @@ case class DeltaFileCommitInfo(committer:
FileDelayedCommitProtocol)
val addedFiles: ArrayBuffer[(Map[String, String], String)] =
new ArrayBuffer[(Map[String, String], String)]
override def apply(stat: NativeFileWriteResult): Unit = {
- if (stat.partition_id == CHColumnarWrite.EMPTY_PARTITION_ID) {
- addedFiles.append((Map.empty[String, String], stat.filename))
- } else {
+ if (CHColumnarWrite.validatedPartitionID(stat.partition_id)) {
val partitionValues = committer.parsePartitions(stat.partition_id)
- addedFiles.append((partitionValues, stat.relativePath))
+ addedFiles.append((partitionValues, new
Path(stat.relativePath).toUri.toString))
+ } else {
+ addedFiles.append((Map.empty[String, String], stat.filename))
}
}
diff --git
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/MergeTreeDeltaColumnarWrite.scala
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/MergeTreeDeltaColumnarWrite.scala
index df33fde66c..d88b6d90b7 100644
---
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/MergeTreeDeltaColumnarWrite.scala
+++
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/MergeTreeDeltaColumnarWrite.scala
@@ -52,10 +52,10 @@ case class MergeTreeWriteResult(
path: Path,
modificationTime: Long,
hostName: Seq[String]): FileAction = {
- val (partitionValues, part_path) = if (partition_id ==
CHColumnarWrite.EMPTY_PARTITION_ID) {
- (Map.empty[String, String], part_name)
- } else {
+ val (partitionValues, part_path) = if
(CHColumnarWrite.validatedPartitionID(partition_id)) {
(MergeTreePartitionUtils.parsePartitions(partition_id),
s"$partition_id/$part_name")
+ } else {
+ (Map.empty[String, String], part_name)
}
val tags = Map[String, String](
"database" -> database,
@@ -88,7 +88,7 @@ case class MergeTreeWriteResult(
DeltaStatistics.NULL_COUNT -> ""
)
AddFile(
- part_path,
+ new Path(part_path).toUri.toString,
partitionValues,
size_in_bytes,
modificationTime,
@@ -153,7 +153,7 @@ case class MergeTreeBasicWriteTaskStatsTracker() extends
(MergeTreeWriteResult =
private var numFiles: Int = 0
def apply(stat: MergeTreeWriteResult): Unit = {
- if (stat.partition_id != CHColumnarWrite.EMPTY_PARTITION_ID) {
+ if (CHColumnarWrite.validatedPartitionID(stat.partition_id)) {
partitions.append(new GenericInternalRow(Array[Any](stat.partition_id)))
}
numFiles += 1
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala
index 87fc50ef3a..e453fc46a3 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala
@@ -158,10 +158,10 @@ case class HadoopMapReduceAdapter(sparkCommitter:
HadoopMapReduceCommitProtocol)
}
case class NativeFileWriteResult(filename: String, partition_id: String,
record_count: Long) {
- lazy val relativePath: String = if (partition_id ==
CHColumnarWrite.EMPTY_PARTITION_ID) {
- filename
- } else {
+ lazy val relativePath: String = if
(CHColumnarWrite.validatedPartitionID(partition_id)) {
s"$partition_id/$filename"
+ } else {
+ filename
}
}
@@ -212,7 +212,7 @@ case class NativeBasicWriteTaskStatsTracker(
private var numWrittenRows: Long = 0
override def apply(stat: NativeFileWriteResult): Unit = {
val absolutePath = s"$writeDir/${stat.relativePath}"
- if (stat.partition_id != CHColumnarWrite.EMPTY_PARTITION_ID) {
+ if (CHColumnarWrite.validatedPartitionID(stat.partition_id)) {
basicWriteJobStatsTracker.newPartition(new
GenericInternalRow(Array[Any](stat.partition_id)))
}
basicWriteJobStatsTracker.newFile(absolutePath)
@@ -233,7 +233,7 @@ case class FileCommitInfo(description: WriteJobDescription)
def apply(stat: NativeFileWriteResult): Unit = {
val tmpAbsolutePath = s"${description.path}/${stat.relativePath}"
- if (stat.partition_id != CHColumnarWrite.EMPTY_PARTITION_ID) {
+ if (CHColumnarWrite.validatedPartitionID(stat.partition_id)) {
partitions += stat.partition_id
val customOutputPath =
description.customPartitionLocations.get(
@@ -325,5 +325,7 @@ object CHColumnarWrite {
case other => CHDeltaColumnarWrite(jobTrackerID, description, other)
}
- val EMPTY_PARTITION_ID = "__NO_PARTITION_ID__"
+ private val EMPTY_PARTITION_ID = "__NO_PARTITION_ID__"
+
+ def validatedPartitionID(partitionID: String): Boolean = partitionID !=
EMPTY_PARTITION_ID
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
index ffb1fa4012..e8cefca818 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
@@ -401,7 +401,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
spark.sql("drop table lineitem_mergetree_hdfs purge")
}
- testSparkVersionLE33("test cache mergetree data no partition columns") {
+ test("test cache mergetree data no partition columns") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
index 923def03e9..d09f48b59b 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
@@ -302,7 +302,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
}
}
- testSparkVersionLE33("test mergetree path based table update") {
+ test("test mergetree path based table update") {
val dataPath = s"$basePath/lineitem_mergetree_update"
clearDataPath(dataPath)
@@ -315,78 +315,87 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
.mode(SaveMode.Append)
.save(dataPath)
- spark.sql(s"""
- | update clickhouse.`$dataPath` set l_returnflag = 'Z' where
l_orderkey = 12647
- |""".stripMargin)
-
- {
- val df = spark.read
- .format("clickhouse")
- .load(dataPath)
- .where("l_returnflag = 'Z'")
- assertResult(1)(df.count())
- val scanExec = collect(df.queryExecution.executedPlan) {
- case f: FileSourceScanExecTransformer => f
- }
- assertResult(1)(scanExec.size)
+ /**
+ * TODO: new test for (spark.databricks.delta.stats.skipping -> true)
+ *
+ * Since one pipeline write will collect stats, so that pruning will be
more accurate in point
+ * query. Let's add a new test when we implement lightweight update and
delete.
+ */
+ withSQLConf(("spark.databricks.delta.stats.skipping", "false")) {
+ spark.sql(s"""
+ | update clickhouse.`$dataPath` set l_returnflag = 'Z'
where l_orderkey = 12647
+ |""".stripMargin)
+
+ {
+ val df = spark.read
+ .format("clickhouse")
+ .load(dataPath)
+ .where("l_returnflag = 'Z'")
+ assertResult(1)(df.count())
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assertResult(1)(scanExec.size)
- val mergetreeScan = scanExec.head
- assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree"))
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree"))
- val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
-
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
-
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
- assert(
- ClickHouseTableV2
- .getTable(fileIndex.deltaLog)
- .orderByKey === StorageMeta.DEFAULT_ORDER_BY_KEY)
- assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKey.isEmpty)
-
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
- val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- assertResult(600572)(addFiles.map(_.rows).sum)
- // 4 parts belong to the first batch
- // 2 parts belong to the second batch (1 actual updated part, 1
passively updated).
- assertResult(6)(addFiles.size)
- val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0,
name.lastIndexOf("_")))
- assertResult(2)(filePaths.size)
- assertResult(Array(2, 4))(filePaths.values.map(paths =>
paths.size).toArray.sorted)
- }
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .orderByKey === StorageMeta.DEFAULT_ORDER_BY_KEY)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKey.isEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+ assertResult(600572)(addFiles.map(_.rows).sum)
+ // 4 parts belong to the first batch
+ // 2 parts belong to the second batch (1 actual updated part, 1
passively updated).
+ assertResult(6)(addFiles.size)
+ val filePaths =
+ addFiles.map(_.path).groupBy(name => name.substring(0,
name.lastIndexOf("_")))
+ assertResult(2)(filePaths.size)
+ assertResult(Array(2, 4))(filePaths.values.map(paths =>
paths.size).toArray.sorted)
+ }
- val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
- clickhouseTable.updateExpr("l_orderkey = 10086", Map("l_returnflag" ->
"'X'"))
+ val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
+ clickhouseTable.updateExpr("l_orderkey = 10086", Map("l_returnflag" ->
"'X'"))
- {
- val df = spark.read
- .format("clickhouse")
- .load(dataPath)
- .where("l_returnflag = 'X'")
- assertResult(1)(df.count())
- val scanExec = collect(df.queryExecution.executedPlan) {
- case f: FileSourceScanExecTransformer => f
- }
- assertResult(1)(scanExec.size)
+ {
+ val df = spark.read
+ .format("clickhouse")
+ .load(dataPath)
+ .where("l_returnflag = 'X'")
+ assertResult(1)(df.count())
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assertResult(1)(scanExec.size)
- val mergetreeScan = scanExec.head
- assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree"))
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("ScanTransformer mergetree"))
- val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
- val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- assertResult(600572)(addFiles.map(_.rows).sum)
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+ assertResult(600572)(addFiles.map(_.rows).sum)
- // 4 parts belong to the first batch
- // 2 parts belong to the second batch (1 actual updated part, 1
passively updated).
- assertResult(6)(addFiles.size)
- val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0,
name.lastIndexOf("_")))
- assertResult(2)(filePaths.size)
+ // 4 parts belong to the first batch
+ // 2 parts belong to the second batch (1 actual updated part, 1
passively updated).
+ assertResult(6)(addFiles.size)
+ val filePaths =
+ addFiles.map(_.path).groupBy(name => name.substring(0,
name.lastIndexOf("_")))
+ assertResult(2)(filePaths.size)
+ }
}
-
val df = spark.read
.format("clickhouse")
.load(dataPath)
assertResult(600572)(df.count())
}
- testSparkVersionLE33("test mergetree path based table delete") {
+ test("test mergetree path based table delete") {
val dataPath = s"$basePath/lineitem_mergetree_delete"
clearDataPath(dataPath)
@@ -399,32 +408,40 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
.mode(SaveMode.Append)
.save(dataPath)
- spark.sql(s"""
- | delete from clickhouse.`$dataPath` where l_orderkey = 12647
- |""".stripMargin)
- val df = spark.read
- .format("clickhouse")
- .load(dataPath)
- assertResult(600571)(df.count())
- val scanExec = collect(df.queryExecution.executedPlan) {
- case f: FileSourceScanExecTransformer => f
+ /**
+ * TODO: new test for (spark.databricks.delta.stats.skipping -> true)
+ *
+ * Since one pipeline write will collect stats, so that pruning will be
more accurate in point
+ * query. Let's add a new test when we implement lightweight update and
delete.
+ */
+ withSQLConf(("spark.databricks.delta.stats.skipping", "false")) {
+ spark.sql(s"""
+ | delete from clickhouse.`$dataPath` where l_orderkey =
12647
+ |""".stripMargin)
+ val df = spark.read
+ .format("clickhouse")
+ .load(dataPath)
+ assertResult(600571)(df.count())
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ val mergetreeScan = scanExec.head
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+ // 4 parts belong to the first batch
+ // 2 parts belong to the second batch (1 actual updated part, 1
passively updated).
+ assertResult(6)(addFiles.size)
+ val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0,
name.lastIndexOf("_")))
+ assertResult(2)(filePaths.size)
+ assertResult(Array(2, 4))(filePaths.values.map(paths =>
paths.size).toArray.sorted)
+
+ val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
+ clickhouseTable.delete("mod(l_orderkey, 3) = 2")
+ val df1 = spark.read
+ .format("clickhouse")
+ .load(dataPath)
+ assertResult(400089)(df1.count())
}
- val mergetreeScan = scanExec.head
- val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
- val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- // 4 parts belong to the first batch
- // 2 parts belong to the second batch (1 actual updated part, 1 passively
updated).
- assertResult(6)(addFiles.size)
- val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0,
name.lastIndexOf("_")))
- assertResult(2)(filePaths.size)
- assertResult(Array(2, 4))(filePaths.values.map(paths =>
paths.size).toArray.sorted)
-
- val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
- clickhouseTable.delete("mod(l_orderkey, 3) = 2")
- val df1 = spark.read
- .format("clickhouse")
- .load(dataPath)
- assertResult(400089)(df1.count())
}
test("test mergetree path based table upsert") {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index 91a22c4f34..7bac0d6dcf 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -436,7 +436,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
spark.sql("drop table lineitem_mergetree_bucket_hdfs")
}
- testSparkVersionLE33("test mergetree write with the path based") {
+ testSparkVersionLE33("test mergetree write with the path based bucket
table") {
val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"
val sourceDF = spark.sql(s"""
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
index d4a5ddd3a8..980e696255 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
@@ -435,7 +435,7 @@ class
GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite
spark.sql("drop table lineitem_mergetree_bucket_hdfs purge")
}
- testSparkVersionLE33("test mergetree write with the path based") {
+ testSparkVersionLE33("test mergetree write with the path based bucket
table") {
val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"
val sourceDF = spark.sql(s"""
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index 4846852256..74046e4ed4 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -496,7 +496,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
spark.sql("drop table lineitem_mergetree_bucket_s3")
}
- testSparkVersionLE33("test mergetree write with the path based") {
+ testSparkVersionLE33("test mergetree write with the path based bucket
table") {
val dataPath = s"s3a://$BUCKET_NAME/lineitem_mergetree_bucket_s3"
val sourceDF = spark.sql(s"""
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
index 52961a218e..d92cb15e75 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -1826,7 +1826,7 @@ class GlutenClickHouseMergeTreeWriteSuite
runTPCHQueryBySQL(6, q6("lineitem_mergetree_case_sensitive")) { _ => }
}
- testSparkVersionLE33("test mergetree with partition with whitespace") {
+ test("test mergetree with partition with whitespace") {
spark.sql(s"""
|DROP TABLE IF EXISTS
lineitem_mergetree_partition_with_whitespace;
|""".stripMargin)
diff --git
a/cpp-ch/local-engine/Functions/SparkFunctionDecimalBinaryArithmetic.cpp
b/cpp-ch/local-engine/Functions/SparkFunctionDecimalBinaryArithmetic.cpp
index 808aba02a9..52f0f1d024 100644
--- a/cpp-ch/local-engine/Functions/SparkFunctionDecimalBinaryArithmetic.cpp
+++ b/cpp-ch/local-engine/Functions/SparkFunctionDecimalBinaryArithmetic.cpp
@@ -356,7 +356,7 @@ private:
auto scaled_right = scale_right > 1 ? applyScaled(right, scale_right)
: right;
ScaledNativeType c_res = 0;
- auto success = Operation::template apply(scaled_left, scaled_right,
c_res);
+ auto success = Operation::template apply<>(scaled_left, scaled_right,
c_res);
if (!success)
return false;
@@ -459,7 +459,7 @@ public:
right_generic,
removeNullable(arguments[2].type).get(),
[&](const auto & left, const auto & right, const auto & result) {
- return (res = SparkDecimalBinaryOperation<Operation,
mode>::template executeDecimal(arguments, left, right, result))
+ return (res = SparkDecimalBinaryOperation<Operation,
mode>::template executeDecimal<>(arguments, left, right, result))
!= nullptr;
});
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp
b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp
index 372396598f..cbac37cd03 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp
@@ -211,7 +211,9 @@ MergeTreeTableInstance::MergeTreeTableInstance(const
std::string & info) : Merge
while (!in.eof())
{
MergeTreePart part;
- readString(part.name, in);
+ std::string encoded_name;
+ readString(encoded_name, in);
+ Poco::URI::decode(encoded_name, part.name);
assertChar('\n', in);
readIntText(part.begin, in);
assertChar('\n', in);
diff --git a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp
b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp
index 8d5fe20545..3ab7cc0e67 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp
@@ -78,7 +78,7 @@ SparkStorageMergeTreePtr StorageMergeTreeFactory::getStorage(
}
DataPartsVector
-StorageMergeTreeFactory::getDataPartsByNames(const StorageID & id, const
String & snapshot_id, std::unordered_set<String> part_name)
+StorageMergeTreeFactory::getDataPartsByNames(const StorageID & id, const
String & snapshot_id, const std::unordered_set<String> & part_name)
{
DataPartsVector res;
auto table_name = getTableName(id, snapshot_id);
diff --git a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.h
b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.h
index cc22563053..a853ee2f2a 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.h
+++ b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.h
@@ -67,7 +67,7 @@ public:
static SparkStorageMergeTreePtr
getStorage(const DB::StorageID& id, const String & snapshot_id, const
MergeTreeTable & merge_tree_table,
const std::function<SparkStorageMergeTreePtr()> & creator);
- static DB::DataPartsVector getDataPartsByNames(const DB::StorageID & id,
const String & snapshot_id, std::unordered_set<String> part_name);
+ static DB::DataPartsVector getDataPartsByNames(const DB::StorageID & id,
const String & snapshot_id, const std::unordered_set<String> & part_name);
static void init_cache_map()
{
auto config =
MergeTreeConfig::loadFromContext(QueryContext::globalContext());
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index 6b2785407d..732518ab77 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -219,7 +219,7 @@ adjustReadRangeIfNeeded(std::unique_ptr<SeekableReadBuffer>
read_buffer, const s
class LocalFileReadBufferBuilder : public ReadBufferBuilder
{
public:
- explicit LocalFileReadBufferBuilder(DB::ContextPtr context_) :
ReadBufferBuilder(context_) { }
+ explicit LocalFileReadBufferBuilder(const DB::ContextPtr & context_) :
ReadBufferBuilder(context_) { }
~LocalFileReadBufferBuilder() override = default;
bool isRemote() const override { return false; }
@@ -692,7 +692,7 @@ ReadBufferBuilder::ReadBufferBuilder(const DB::ContextPtr &
context_) : context(
}
std::unique_ptr<DB::ReadBuffer>
-ReadBufferBuilder::wrapWithBzip2(std::unique_ptr<DB::ReadBuffer> in, const
substrait::ReadRel::LocalFiles::FileOrFiles & file_info)
+ReadBufferBuilder::wrapWithBzip2(std::unique_ptr<DB::ReadBuffer> in, const
substrait::ReadRel::LocalFiles::FileOrFiles & file_info) const
{
/// Bzip2 compressed file is splittable and we need to adjust read range
for each split
auto * seekable = dynamic_cast<SeekableReadBuffer *>(in.release());
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
index 81a4c60bfc..038e864b8e 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
@@ -49,7 +49,7 @@ protected:
using ReadBufferCreator =
std::function<std::unique_ptr<DB::ReadBufferFromFileBase>(bool restricted_seek,
const DB::StoredObject & object)>;
std::unique_ptr<DB::ReadBuffer>
- wrapWithBzip2(std::unique_ptr<DB::ReadBuffer> in, const
substrait::ReadRel::LocalFiles::FileOrFiles & file_info);
+ wrapWithBzip2(std::unique_ptr<DB::ReadBuffer> in, const
substrait::ReadRel::LocalFiles::FileOrFiles & file_info) const;
ReadBufferCreator wrapWithCache(
ReadBufferCreator read_buffer_creator,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]