This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 85d90c9d2 [CH] Fix load cache missing columns #7192
85d90c9d2 is described below
commit 85d90c9d2d371a2285bb74b565d7b36921d66dab
Author: LiuNeng <[email protected]>
AuthorDate: Wed Sep 11 15:54:06 2024 +0800
[CH] Fix load cache missing columns #7192
What changes were proposed in this pull request?
Fix MergeTree cache load failed when column name is upper case
How was this patch tested?
unit tests
(If this patch involves UI changes, please attach a screenshot; otherwise,
remove this)
---
.../apache/spark/rpc/GlutenExecutorEndpoint.scala | 7 +-
.../commands/GlutenCHCacheDataCommand.scala | 4 +-
... GlutenClickHouseMergeTreeCacheDataSuite.scala} | 195 ++++++++++++++++++++-
.../substrait/rel/ExtensionTableBuilder.java | 14 +-
4 files changed, 210 insertions(+), 10 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
index 7f2b94eea..559a22cb1 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
@@ -75,9 +75,12 @@ class GlutenExecutorEndpoint(val executorId: String, val
conf: SparkConf)
val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns)
context.reply(CacheJobInfo(status = true, jobId))
} catch {
- case _: Exception =>
+ case e: Exception =>
context.reply(
- CacheJobInfo(status = false, "", s"executor: $executorId cache
data failed."))
+ CacheJobInfo(
+ status = false,
+ "",
+ s"executor: $executorId cache data failed: ${e.getMessage}."))
}
case GlutenCacheLoadStatus(jobId) =>
val status = CHNativeCacheManager.getCacheStatus(jobId)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
index 43e3b4b7a..7aca290b1 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference,
import org.apache.spark.sql.delta._
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.commands.GlutenCacheBase._
+import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.types.{BooleanType, StringType}
@@ -180,7 +181,8 @@ case class GlutenCHCacheDataCommand(
ClickhouseSnapshot.genSnapshotId(snapshot),
onePart.tablePath,
pathToCache.toString,
- snapshot.metadata.configuration.getOrElse("orderByKey", ""),
+ snapshot.metadata.configuration
+ .getOrElse("orderByKey",
MergeTreeDeltaUtil.DEFAULT_ORDER_BY_KEY),
snapshot.metadata.configuration.getOrElse("lowCardKey", ""),
snapshot.metadata.configuration.getOrElse("minmaxIndexKey", ""),
snapshot.metadata.configuration.getOrElse("bloomfilterIndexKey",
""),
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSuite.scala
similarity index 67%
rename from
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala
rename to
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSuite.scala
index a55067185..88bb00fac 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSuite.scala
@@ -32,7 +32,7 @@ import scala.concurrent.duration.DurationInt
// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit
-class GlutenClickHouseMergeTreeCacheDataSSuite
+class GlutenClickHouseMergeTreeCacheDataSuite
extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {
@@ -398,5 +398,198 @@ class GlutenClickHouseMergeTreeCacheDataSSuite
})
spark.sql("drop table lineitem_mergetree_hdfs purge")
}
+
+ test("test cache mergetree data no partition columns") {
+
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
+ |(
+ | L_ORDERKEY bigint,
+ | L_PARTKEY bigint,
+ | L_SUPPKEY bigint,
+ | L_LINENUMBER bigint,
+ | L_QUANTITY double,
+ | L_EXTENDEDPRICE double,
+ | L_DISCOUNT double,
+ | L_TAX double,
+ | L_RETURNFLAG string,
+ | L_LINESTATUS string,
+ | L_SHIPDATE date,
+ | L_COMMITDATE date,
+ | L_RECEIPTDATE date,
+ | L_SHIPINSTRUCT string,
+ | L_SHIPMODE string,
+ | L_COMMENT string
+ |)
+ |USING clickhouse
+ |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ |TBLPROPERTIES (storage_policy='__hdfs_main')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_hdfs
+ | select * from lineitem a
+ | where a.l_shipdate between date'1995-01-01' and
date'1995-01-31'
+ |""".stripMargin)
+ FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+ val dataPath = new File(HDFS_CACHE_PATH)
+ val initial_cache_files = countFiles(dataPath)
+
+ val metaPath = new File(HDFS_METADATA_PATH +
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+ val res1 = spark.sql(s"cache data select * from
lineitem_mergetree_hdfs").collect()
+ assertResult(true)(res1(0).getBoolean(0))
+ assertResult(1)(metaPath.list().length)
+ assert(countFiles(dataPath) > initial_cache_files)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_hdfs
+ |WHERE
+ | l_shipdate >= date'1995-01-10'
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runSql(sqlStr)(
+ df => {
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assertResult(1)(scanExec.size)
+
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+ assertResult(7898)(addFiles.map(_.rows).sum)
+ })
+ spark.sql("drop table lineitem_mergetree_hdfs purge")
+ }
+
+ test("test cache mergetree data with upper case column name") {
+
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
+ |(
+ | L_ORDERKEY bigint,
+ | L_PARTKEY bigint,
+ | L_SUPPKEY bigint,
+ | L_LINENUMBER bigint,
+ | L_QUANTITY double,
+ | L_EXTENDEDPRICE double,
+ | L_DISCOUNT double,
+ | L_TAX double,
+ | L_RETURNFLAG string,
+ | L_LINESTATUS string,
+ | L_SHIPDATE date,
+ | L_COMMITDATE date,
+ | L_RECEIPTDATE date,
+ | L_SHIPINSTRUCT string,
+ | L_SHIPMODE string,
+ | L_COMMENT string
+ |)
+ |USING clickhouse
+ |PARTITIONED BY (L_SHIPDATE)
+ |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ |TBLPROPERTIES (storage_policy='__hdfs_main',
+ | orderByKey='L_LINENUMBER,L_ORDERKEY')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_hdfs
+ | select * from lineitem a
+ | where a.l_shipdate between date'1995-01-01' and
date'1995-01-31'
+ |""".stripMargin)
+ FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+ val dataPath = new File(HDFS_CACHE_PATH)
+ val initial_cache_files = countFiles(dataPath)
+
+ val res = spark
+ .sql(s"""
+ |cache data
+ | select * from '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ | after L_SHIPDATE AS OF '1995-01-10'
+ | CACHEPROPERTIES(storage_policy='__hdfs_main',
+ | aaa='ccc')""".stripMargin)
+ .collect()
+ assertResult(true)(res(0).getBoolean(0))
+ val metaPath = new File(HDFS_METADATA_PATH +
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+ assertResult(true)(metaPath.exists() && metaPath.isDirectory)
+ assertResult(22)(metaPath.list().length)
+ assert(countFiles(dataPath) > initial_cache_files)
+ val first_cache_files = countFiles(dataPath)
+ val res1 = spark.sql(s"cache data select * from
lineitem_mergetree_hdfs").collect()
+ assertResult(true)(res1(0).getBoolean(0))
+ assertResult(31)(metaPath.list().length)
+ assert(countFiles(dataPath) > first_cache_files)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_hdfs
+ |WHERE
+ | l_shipdate >= date'1995-01-10'
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runSql(sqlStr)(
+ df => {
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assertResult(1)(scanExec.size)
+
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+ assertResult(7898)(addFiles.map(_.rows).sum)
+ })
+ spark.sql("drop table lineitem_mergetree_hdfs purge")
+ }
}
// scalastyle:off line.size.limit
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java
index 34c017d80..b2b813dd9 100644
---
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java
+++
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java
@@ -16,6 +16,8 @@
*/
package org.apache.gluten.substrait.rel;
+import org.apache.gluten.expression.ConverterUtils;
+
import java.util.List;
import java.util.Map;
@@ -50,12 +52,12 @@ public class ExtensionTableBuilder {
snapshotId,
relativeTablePath,
absoluteTablePath,
- orderByKey,
- lowCardKey,
- minmaxIndexKey,
- bfIndexKey,
- setIndexKey,
- primaryKey,
+ ConverterUtils.normalizeColName(orderByKey),
+ ConverterUtils.normalizeColName(lowCardKey),
+ ConverterUtils.normalizeColName(minmaxIndexKey),
+ ConverterUtils.normalizeColName(bfIndexKey),
+ ConverterUtils.normalizeColName(setIndexKey),
+ ConverterUtils.normalizeColName(primaryKey),
partList,
starts,
lengths,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]