This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b93b098ed [GLUTEN-6589][CH] Mergetree supported 
spark.sql.caseSensitive (#6592)
b93b098ed is described below

commit b93b098eda072dbb73903550d4848dfde09180c0
Author: Shuai li <[email protected]>
AuthorDate: Fri Jul 26 11:54:50 2024 +0800

    [GLUTEN-6589][CH] Mergetree supported spark.sql.caseSensitive (#6592)
    
    [CH] Mergetree supported spark.sql.caseSensitive
---
 .../sql/delta/catalog/ClickHouseTableV2Base.scala  | 33 ++++----------
 .../datasources/utils/MergeTreeDeltaUtil.scala     | 11 ++++-
 .../datasources/v1/CHMergeTreeWriterInjects.scala  | 20 ++-------
 .../GlutenClickHouseMergeTreeWriteSuite.scala      | 51 ++++++++++++++++++++++
 .../apache/gluten/expression/ConverterUtils.scala  |  2 +-
 5 files changed, 74 insertions(+), 43 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
index 9c129b9f5..633d23f77 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
@@ -16,8 +16,11 @@
  */
 package org.apache.spark.sql.delta.catalog
 
+import org.apache.gluten.expression.ConverterUtils.normalizeColName
+
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
 import org.apache.spark.sql.delta.Snapshot
+import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil
 
 import org.apache.hadoop.fs.Path
 
@@ -153,33 +156,15 @@ trait ClickHouseTableV2Base {
     configs.toMap
   }
 
-  def primaryKey(): String = primaryKeyOption match {
-    case Some(keys) => keys.mkString(",")
-    case None => ""
-  }
+  def primaryKey(): String = MergeTreeDeltaUtil.columnsToStr(primaryKeyOption)
 
   def orderByKey(): String = orderByKeyOption match {
-    case Some(keys) => keys.mkString(",")
+    case Some(keys) => keys.map(normalizeColName).mkString(",")
     case None => "tuple()"
   }
 
-  def lowCardKey(): String = lowCardKeyOption match {
-    case Some(keys) => keys.mkString(",")
-    case None => ""
-  }
-
-  def minmaxIndexKey(): String = minmaxIndexKeyOption match {
-    case Some(keys) => keys.mkString(",")
-    case None => ""
-  }
-
-  def bfIndexKey(): String = bfIndexKeyOption match {
-    case Some(keys) => keys.mkString(",")
-    case None => ""
-  }
-
-  def setIndexKey(): String = setIndexKeyOption match {
-    case Some(keys) => keys.mkString(",")
-    case None => ""
-  }
+  def lowCardKey(): String = MergeTreeDeltaUtil.columnsToStr(lowCardKeyOption)
+  def minmaxIndexKey(): String = 
MergeTreeDeltaUtil.columnsToStr(minmaxIndexKeyOption)
+  def bfIndexKey(): String = MergeTreeDeltaUtil.columnsToStr(bfIndexKeyOption)
+  def setIndexKey(): String = 
MergeTreeDeltaUtil.columnsToStr(setIndexKeyOption)
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala
index 954b43b6a..6b2af0953 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.execution.datasources.utils
 
+import org.apache.gluten.expression.ConverterUtils.normalizeColName
+
 object MergeTreeDeltaUtil {
 
   val DEFAULT_ORDER_BY_KEY = "tuple()"
@@ -25,7 +27,7 @@ object MergeTreeDeltaUtil {
       primaryKeyOption: Option[Seq[String]]): (String, String) = {
     val orderByKey =
       if (orderByKeyOption.isDefined && orderByKeyOption.get.nonEmpty) {
-        orderByKeyOption.get.mkString(",")
+        columnsToStr(orderByKeyOption)
       } else DEFAULT_ORDER_BY_KEY
 
     val primaryKey =
@@ -33,9 +35,14 @@ object MergeTreeDeltaUtil {
         !orderByKey.equals(DEFAULT_ORDER_BY_KEY) && primaryKeyOption.isDefined 
&&
         primaryKeyOption.get.nonEmpty
       ) {
-        primaryKeyOption.get.mkString(",")
+        columnsToStr(primaryKeyOption)
       } else ""
 
     (orderByKey, primaryKey)
   }
+
+  def columnsToStr(option: Option[Seq[String]]): String = option match {
+    case Some(keys) => keys.map(normalizeColName).mkString(",")
+    case None => ""
+  }
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
index e11406d56..237d5a46d 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
@@ -170,22 +170,10 @@ object CHMergeTreeWriterInjects {
       primaryKeyOption
     )
 
-    val lowCardKey = lowCardKeyOption match {
-      case Some(keys) => keys.mkString(",")
-      case None => ""
-    }
-    val minmaxIndexKey = minmaxIndexKeyOption match {
-      case Some(keys) => keys.mkString(",")
-      case None => ""
-    }
-    val bfIndexKey = bfIndexKeyOption match {
-      case Some(keys) => keys.mkString(",")
-      case None => ""
-    }
-    val setIndexKey = setIndexKeyOption match {
-      case Some(keys) => keys.mkString(",")
-      case None => ""
-    }
+    val lowCardKey = MergeTreeDeltaUtil.columnsToStr(lowCardKeyOption)
+    val minmaxIndexKey = MergeTreeDeltaUtil.columnsToStr(minmaxIndexKeyOption)
+    val bfIndexKey = MergeTreeDeltaUtil.columnsToStr(bfIndexKeyOption)
+    val setIndexKey = MergeTreeDeltaUtil.columnsToStr(setIndexKeyOption)
 
     val substraitContext = new SubstraitContext
     val extensionTableNode = ExtensionTableBuilder.makeExtensionTable(
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
index e88eb1fed..2563d792b 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -1971,5 +1971,56 @@ class GlutenClickHouseMergeTreeWriteSuite
         }
       })
   }
+
+  test("test mergetree with column case sensitive") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS LINEITEM_MERGETREE_CASE_SENSITIVE;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS LINEITEM_MERGETREE_CASE_SENSITIVE
+                 |(
+                 | L_ORDERKEY      bigint,
+                 | L_PARTKEY       bigint,
+                 | L_SUPPKEY       bigint,
+                 | L_LINENUMBER    bigint,
+                 | L_QUANTITY      double,
+                 | L_EXTENDEDPRICE double,
+                 | L_DISCOUNT      double,
+                 | L_TAX           double,
+                 | L_RETURNFLAG    string,
+                 | L_LINESTATUS    string,
+                 | L_SHIPDATE      date,
+                 | L_COMMITDATE    date,
+                 | L_RECEIPTDATE   date,
+                 | L_SHIPINSTRUCT  string,
+                 | L_SHIPMODE      string,
+                 | L_COMMENT       string
+                 |)
+                 |USING clickhouse
+                 |PARTITIONED BY (L_SHIPDATE)
+                 |TBLPROPERTIES (orderByKey='L_DISCOUNT')
+                 |LOCATION '$basePath/LINEITEM_MERGETREE_CASE_SENSITIVE'
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_case_sensitive
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    val sqlStr =
+      s"""
+         |SELECT
+         |    sum(l_extendedprice * l_discount) AS revenue
+         |FROM
+         |    lineitem_mergetree_case_sensitive
+         |WHERE
+         |    l_shipdate >= date'1994-01-01'
+         |    AND l_shipdate < date'1994-01-01' + interval 1 year
+         |    AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
+         |    AND l_quantity < 24
+         |""".stripMargin
+    runTPCHQueryBySQL(6, sqlStr) { _ => }
+  }
 }
 // scalastyle:off line.size.limit
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala 
b/gluten-core/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala
index 473ee7f9d..4b929e525 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala
@@ -138,7 +138,7 @@ object ConverterUtils extends Logging {
   /** Convert StructType to Json */
   def convertNamedStructJson(tableSchema: StructType): String = {
     val typeNodes = ConverterUtils.collectAttributeTypeNodes(tableSchema)
-    val nameList = tableSchema.fieldNames
+    val nameList = tableSchema.fieldNames.map(normalizeColName)
 
     val structBuilder = Type.Struct.newBuilder
     for (typeNode <- typeNodes.asScala) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to