This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 85d90c9d2 [CH] Fix load cache missing columns #7192
85d90c9d2 is described below

commit 85d90c9d2d371a2285bb74b565d7b36921d66dab
Author: LiuNeng <[email protected]>
AuthorDate: Wed Sep 11 15:54:06 2024 +0800

    [CH] Fix load cache missing columns #7192
    
    What changes were proposed in this pull request?
    Fix MergeTree cache load failed when column name is upper case
    
    How was this patch tested?
    unit tests
    
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
---
 .../apache/spark/rpc/GlutenExecutorEndpoint.scala  |   7 +-
 .../commands/GlutenCHCacheDataCommand.scala        |   4 +-
 ... GlutenClickHouseMergeTreeCacheDataSuite.scala} | 195 ++++++++++++++++++++-
 .../substrait/rel/ExtensionTableBuilder.java       |  14 +-
 4 files changed, 210 insertions(+), 10 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
index 7f2b94eea..559a22cb1 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
@@ -75,9 +75,12 @@ class GlutenExecutorEndpoint(val executorId: String, val 
conf: SparkConf)
         val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns)
         context.reply(CacheJobInfo(status = true, jobId))
       } catch {
-        case _: Exception =>
+        case e: Exception =>
           context.reply(
-            CacheJobInfo(status = false, "", s"executor: $executorId cache 
data failed."))
+            CacheJobInfo(
+              status = false,
+              "",
+              s"executor: $executorId cache data failed: ${e.getMessage}."))
       }
     case GlutenCacheLoadStatus(jobId) =>
       val status = CHNativeCacheManager.getCacheStatus(jobId)
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
index 43e3b4b7a..7aca290b1 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference,
 import org.apache.spark.sql.delta._
 import org.apache.spark.sql.execution.command.LeafRunnableCommand
 import org.apache.spark.sql.execution.commands.GlutenCacheBase._
+import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
 import org.apache.spark.sql.types.{BooleanType, StringType}
 
@@ -180,7 +181,8 @@ case class GlutenCHCacheDataCommand(
             ClickhouseSnapshot.genSnapshotId(snapshot),
             onePart.tablePath,
             pathToCache.toString,
-            snapshot.metadata.configuration.getOrElse("orderByKey", ""),
+            snapshot.metadata.configuration
+              .getOrElse("orderByKey", 
MergeTreeDeltaUtil.DEFAULT_ORDER_BY_KEY),
             snapshot.metadata.configuration.getOrElse("lowCardKey", ""),
             snapshot.metadata.configuration.getOrElse("minmaxIndexKey", ""),
             snapshot.metadata.configuration.getOrElse("bloomfilterIndexKey", 
""),
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSuite.scala
similarity index 67%
rename from 
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala
rename to 
backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSuite.scala
index a55067185..88bb00fac 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSuite.scala
@@ -32,7 +32,7 @@ import scala.concurrent.duration.DurationInt
 // Some sqls' line length exceeds 100
 // scalastyle:off line.size.limit
 
-class GlutenClickHouseMergeTreeCacheDataSSuite
+class GlutenClickHouseMergeTreeCacheDataSuite
   extends GlutenClickHouseTPCHAbstractSuite
   with AdaptiveSparkPlanHelper {
 
@@ -398,5 +398,198 @@ class GlutenClickHouseMergeTreeCacheDataSSuite
       })
     spark.sql("drop table lineitem_mergetree_hdfs purge")
   }
+
+  test("test cache mergetree data no partition columns") {
+
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
+                 |(
+                 | L_ORDERKEY      bigint,
+                 | L_PARTKEY       bigint,
+                 | L_SUPPKEY       bigint,
+                 | L_LINENUMBER    bigint,
+                 | L_QUANTITY      double,
+                 | L_EXTENDEDPRICE double,
+                 | L_DISCOUNT      double,
+                 | L_TAX           double,
+                 | L_RETURNFLAG    string,
+                 | L_LINESTATUS    string,
+                 | L_SHIPDATE      date,
+                 | L_COMMITDATE    date,
+                 | L_RECEIPTDATE   date,
+                 | L_SHIPINSTRUCT  string,
+                 | L_SHIPMODE      string,
+                 | L_COMMENT       string
+                 |)
+                 |USING clickhouse
+                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+                 |TBLPROPERTIES (storage_policy='__hdfs_main')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_hdfs
+                 | select * from lineitem a
+                 | where a.l_shipdate between date'1995-01-01' and 
date'1995-01-31'
+                 |""".stripMargin)
+    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+    val dataPath = new File(HDFS_CACHE_PATH)
+    val initial_cache_files = countFiles(dataPath)
+
+    val metaPath = new File(HDFS_METADATA_PATH + 
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+    val res1 = spark.sql(s"cache data select * from 
lineitem_mergetree_hdfs").collect()
+    assertResult(true)(res1(0).getBoolean(0))
+    assertResult(1)(metaPath.list().length)
+    assert(countFiles(dataPath) > initial_cache_files)
+
+    val sqlStr =
+      s"""
+         |SELECT
+         |    l_returnflag,
+         |    l_linestatus,
+         |    sum(l_quantity) AS sum_qty,
+         |    sum(l_extendedprice) AS sum_base_price,
+         |    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+         |    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS 
sum_charge,
+         |    avg(l_quantity) AS avg_qty,
+         |    avg(l_extendedprice) AS avg_price,
+         |    avg(l_discount) AS avg_disc,
+         |    count(*) AS count_order
+         |FROM
+         |    lineitem_mergetree_hdfs
+         |WHERE
+         |    l_shipdate >= date'1995-01-10'
+         |GROUP BY
+         |    l_returnflag,
+         |    l_linestatus
+         |ORDER BY
+         |    l_returnflag,
+         |    l_linestatus;
+         |
+         |""".stripMargin
+    runSql(sqlStr)(
+      df => {
+        val scanExec = collect(df.queryExecution.executedPlan) {
+          case f: FileSourceScanExecTransformer => f
+        }
+        assertResult(1)(scanExec.size)
+
+        val mergetreeScan = scanExec.head
+        assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+        assertResult(7898)(addFiles.map(_.rows).sum)
+      })
+    spark.sql("drop table lineitem_mergetree_hdfs purge")
+  }
+
+  test("test cache mergetree data with upper case column name") {
+
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
+                 |(
+                 | L_ORDERKEY      bigint,
+                 | L_PARTKEY       bigint,
+                 | L_SUPPKEY       bigint,
+                 | L_LINENUMBER    bigint,
+                 | L_QUANTITY      double,
+                 | L_EXTENDEDPRICE double,
+                 | L_DISCOUNT      double,
+                 | L_TAX           double,
+                 | L_RETURNFLAG    string,
+                 | L_LINESTATUS    string,
+                 | L_SHIPDATE      date,
+                 | L_COMMITDATE    date,
+                 | L_RECEIPTDATE   date,
+                 | L_SHIPINSTRUCT  string,
+                 | L_SHIPMODE      string,
+                 | L_COMMENT       string
+                 |)
+                 |USING clickhouse
+                 |PARTITIONED BY (L_SHIPDATE)
+                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+                 |TBLPROPERTIES (storage_policy='__hdfs_main',
+                 |               orderByKey='L_LINENUMBER,L_ORDERKEY')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_hdfs
+                 | select * from lineitem a
+                 | where a.l_shipdate between date'1995-01-01' and 
date'1995-01-31'
+                 |""".stripMargin)
+    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+    val dataPath = new File(HDFS_CACHE_PATH)
+    val initial_cache_files = countFiles(dataPath)
+
+    val res = spark
+      .sql(s"""
+              |cache data
+              |  select * from '$HDFS_URL/test/lineitem_mergetree_hdfs'
+              |  after L_SHIPDATE AS OF '1995-01-10'
+              |  CACHEPROPERTIES(storage_policy='__hdfs_main',
+              |                aaa='ccc')""".stripMargin)
+      .collect()
+    assertResult(true)(res(0).getBoolean(0))
+    val metaPath = new File(HDFS_METADATA_PATH + 
s"$sparkVersion/test/lineitem_mergetree_hdfs")
+    assertResult(true)(metaPath.exists() && metaPath.isDirectory)
+    assertResult(22)(metaPath.list().length)
+    assert(countFiles(dataPath) > initial_cache_files)
+    val first_cache_files = countFiles(dataPath)
+    val res1 = spark.sql(s"cache data select * from 
lineitem_mergetree_hdfs").collect()
+    assertResult(true)(res1(0).getBoolean(0))
+    assertResult(31)(metaPath.list().length)
+    assert(countFiles(dataPath) > first_cache_files)
+
+    val sqlStr =
+      s"""
+         |SELECT
+         |    l_returnflag,
+         |    l_linestatus,
+         |    sum(l_quantity) AS sum_qty,
+         |    sum(l_extendedprice) AS sum_base_price,
+         |    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+         |    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS 
sum_charge,
+         |    avg(l_quantity) AS avg_qty,
+         |    avg(l_extendedprice) AS avg_price,
+         |    avg(l_discount) AS avg_disc,
+         |    count(*) AS count_order
+         |FROM
+         |    lineitem_mergetree_hdfs
+         |WHERE
+         |    l_shipdate >= date'1995-01-10'
+         |GROUP BY
+         |    l_returnflag,
+         |    l_linestatus
+         |ORDER BY
+         |    l_returnflag,
+         |    l_linestatus;
+         |
+         |""".stripMargin
+    runSql(sqlStr)(
+      df => {
+        val scanExec = collect(df.queryExecution.executedPlan) {
+          case f: FileSourceScanExecTransformer => f
+        }
+        assertResult(1)(scanExec.size)
+
+        val mergetreeScan = scanExec.head
+        assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+        assertResult(7898)(addFiles.map(_.rows).sum)
+      })
+    spark.sql("drop table lineitem_mergetree_hdfs purge")
+  }
 }
 // scalastyle:off line.size.limit
diff --git 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java
 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java
index 34c017d80..b2b813dd9 100644
--- 
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java
+++ 
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java
@@ -16,6 +16,8 @@
  */
 package org.apache.gluten.substrait.rel;
 
+import org.apache.gluten.expression.ConverterUtils;
+
 import java.util.List;
 import java.util.Map;
 
@@ -50,12 +52,12 @@ public class ExtensionTableBuilder {
         snapshotId,
         relativeTablePath,
         absoluteTablePath,
-        orderByKey,
-        lowCardKey,
-        minmaxIndexKey,
-        bfIndexKey,
-        setIndexKey,
-        primaryKey,
+        ConverterUtils.normalizeColName(orderByKey),
+        ConverterUtils.normalizeColName(lowCardKey),
+        ConverterUtils.normalizeColName(minmaxIndexKey),
+        ConverterUtils.normalizeColName(bfIndexKey),
+        ConverterUtils.normalizeColName(setIndexKey),
+        ConverterUtils.normalizeColName(primaryKey),
         partList,
         starts,
         lengths,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to