This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b93b098ed [GLUTEN-6589][CH] Mergetree supported
spark.sql.caseSensitive (#6592)
b93b098ed is described below
commit b93b098eda072dbb73903550d4848dfde09180c0
Author: Shuai li <[email protected]>
AuthorDate: Fri Jul 26 11:54:50 2024 +0800
[GLUTEN-6589][CH] Mergetree supported spark.sql.caseSensitive (#6592)
[CH] Mergetree supported spark.sql.caseSensitive
---
.../sql/delta/catalog/ClickHouseTableV2Base.scala | 33 ++++----------
.../datasources/utils/MergeTreeDeltaUtil.scala | 11 ++++-
.../datasources/v1/CHMergeTreeWriterInjects.scala | 20 ++-------
.../GlutenClickHouseMergeTreeWriteSuite.scala | 51 ++++++++++++++++++++++
.../apache/gluten/expression/ConverterUtils.scala | 2 +-
5 files changed, 74 insertions(+), 43 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
index 9c129b9f5..633d23f77 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
@@ -16,8 +16,11 @@
*/
package org.apache.spark.sql.delta.catalog
+import org.apache.gluten.expression.ConverterUtils.normalizeColName
+
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
import org.apache.spark.sql.delta.Snapshot
+import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil
import org.apache.hadoop.fs.Path
@@ -153,33 +156,15 @@ trait ClickHouseTableV2Base {
configs.toMap
}
- def primaryKey(): String = primaryKeyOption match {
- case Some(keys) => keys.mkString(",")
- case None => ""
- }
+ def primaryKey(): String = MergeTreeDeltaUtil.columnsToStr(primaryKeyOption)
def orderByKey(): String = orderByKeyOption match {
- case Some(keys) => keys.mkString(",")
+ case Some(keys) => keys.map(normalizeColName).mkString(",")
case None => "tuple()"
}
- def lowCardKey(): String = lowCardKeyOption match {
- case Some(keys) => keys.mkString(",")
- case None => ""
- }
-
- def minmaxIndexKey(): String = minmaxIndexKeyOption match {
- case Some(keys) => keys.mkString(",")
- case None => ""
- }
-
- def bfIndexKey(): String = bfIndexKeyOption match {
- case Some(keys) => keys.mkString(",")
- case None => ""
- }
-
- def setIndexKey(): String = setIndexKeyOption match {
- case Some(keys) => keys.mkString(",")
- case None => ""
- }
+ def lowCardKey(): String = MergeTreeDeltaUtil.columnsToStr(lowCardKeyOption)
+ def minmaxIndexKey(): String =
MergeTreeDeltaUtil.columnsToStr(minmaxIndexKeyOption)
+ def bfIndexKey(): String = MergeTreeDeltaUtil.columnsToStr(bfIndexKeyOption)
+ def setIndexKey(): String =
MergeTreeDeltaUtil.columnsToStr(setIndexKeyOption)
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala
index 954b43b6a..6b2af0953 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources.utils
+import org.apache.gluten.expression.ConverterUtils.normalizeColName
+
object MergeTreeDeltaUtil {
val DEFAULT_ORDER_BY_KEY = "tuple()"
@@ -25,7 +27,7 @@ object MergeTreeDeltaUtil {
primaryKeyOption: Option[Seq[String]]): (String, String) = {
val orderByKey =
if (orderByKeyOption.isDefined && orderByKeyOption.get.nonEmpty) {
- orderByKeyOption.get.mkString(",")
+ columnsToStr(orderByKeyOption)
} else DEFAULT_ORDER_BY_KEY
val primaryKey =
@@ -33,9 +35,14 @@ object MergeTreeDeltaUtil {
!orderByKey.equals(DEFAULT_ORDER_BY_KEY) && primaryKeyOption.isDefined
&&
primaryKeyOption.get.nonEmpty
) {
- primaryKeyOption.get.mkString(",")
+ columnsToStr(primaryKeyOption)
} else ""
(orderByKey, primaryKey)
}
+
+ def columnsToStr(option: Option[Seq[String]]): String = option match {
+ case Some(keys) => keys.map(normalizeColName).mkString(",")
+ case None => ""
+ }
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
index e11406d56..237d5a46d 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
@@ -170,22 +170,10 @@ object CHMergeTreeWriterInjects {
primaryKeyOption
)
- val lowCardKey = lowCardKeyOption match {
- case Some(keys) => keys.mkString(",")
- case None => ""
- }
- val minmaxIndexKey = minmaxIndexKeyOption match {
- case Some(keys) => keys.mkString(",")
- case None => ""
- }
- val bfIndexKey = bfIndexKeyOption match {
- case Some(keys) => keys.mkString(",")
- case None => ""
- }
- val setIndexKey = setIndexKeyOption match {
- case Some(keys) => keys.mkString(",")
- case None => ""
- }
+ val lowCardKey = MergeTreeDeltaUtil.columnsToStr(lowCardKeyOption)
+ val minmaxIndexKey = MergeTreeDeltaUtil.columnsToStr(minmaxIndexKeyOption)
+ val bfIndexKey = MergeTreeDeltaUtil.columnsToStr(bfIndexKeyOption)
+ val setIndexKey = MergeTreeDeltaUtil.columnsToStr(setIndexKeyOption)
val substraitContext = new SubstraitContext
val extensionTableNode = ExtensionTableBuilder.makeExtensionTable(
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
index e88eb1fed..2563d792b 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -1971,5 +1971,56 @@ class GlutenClickHouseMergeTreeWriteSuite
}
})
}
+
+ test("test mergetree with column case sensitive") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS LINEITEM_MERGETREE_CASE_SENSITIVE;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS LINEITEM_MERGETREE_CASE_SENSITIVE
+ |(
+ | L_ORDERKEY bigint,
+ | L_PARTKEY bigint,
+ | L_SUPPKEY bigint,
+ | L_LINENUMBER bigint,
+ | L_QUANTITY double,
+ | L_EXTENDEDPRICE double,
+ | L_DISCOUNT double,
+ | L_TAX double,
+ | L_RETURNFLAG string,
+ | L_LINESTATUS string,
+ | L_SHIPDATE date,
+ | L_COMMITDATE date,
+ | L_RECEIPTDATE date,
+ | L_SHIPINSTRUCT string,
+ | L_SHIPMODE string,
+ | L_COMMENT string
+ |)
+ |USING clickhouse
+ |PARTITIONED BY (L_SHIPDATE)
+ |TBLPROPERTIES (orderByKey='L_DISCOUNT')
+ |LOCATION '$basePath/LINEITEM_MERGETREE_CASE_SENSITIVE'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_case_sensitive
+ | select * from lineitem
+ |""".stripMargin)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | sum(l_extendedprice * l_discount) AS revenue
+ |FROM
+ | lineitem_mergetree_case_sensitive
+ |WHERE
+ | l_shipdate >= date'1994-01-01'
+ | AND l_shipdate < date'1994-01-01' + interval 1 year
+ | AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
+ | AND l_quantity < 24
+ |""".stripMargin
+ runTPCHQueryBySQL(6, sqlStr) { _ => }
+ }
}
// scalastyle:off line.size.limit
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala
b/gluten-core/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala
index 473ee7f9d..4b929e525 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala
@@ -138,7 +138,7 @@ object ConverterUtils extends Logging {
/** Convert StructType to Json */
def convertNamedStructJson(tableSchema: StructType): String = {
val typeNodes = ConverterUtils.collectAttributeTypeNodes(tableSchema)
- val nameList = tableSchema.fieldNames
+ val nameList = tableSchema.fieldNames.map(normalizeColName)
val structBuilder = Type.Struct.newBuilder
for (typeNode <- typeNodes.asScala) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]