This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b8929a804 [GLUTEN-6589][CH] Mergetree supported
spark.sql.caseSensitive[Part.2] (#6733)
b8929a804 is described below
commit b8929a804181b4fedbc46ef43465cd56652b0c72
Author: Shuai li <[email protected]>
AuthorDate: Wed Aug 7 15:05:15 2024 +0800
[GLUTEN-6589][CH] Mergetree supported spark.sql.caseSensitive[Part.2]
(#6733)
[CH] Mergetree supported spark.sql.caseSensitive[Part.2]
---
.../sql/delta/catalog/ClickHouseTableV2Base.scala | 14 ++++++++-----
.../execution/GlutenClickHouseHiveTableSuite.scala | 24 +++++++++++++++++++++-
2 files changed, 32 insertions(+), 6 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
index 633d23f77..1ab2e12d4 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.delta.catalog
+import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.expression.ConverterUtils.normalizeColName
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
@@ -49,10 +50,9 @@ trait ClickHouseTableV2Base {
if (tableProperties.containsKey("numBuckets")) {
val numBuckets = tableProperties.get("numBuckets").trim.toInt
val bucketColumnNames: Seq[String] =
- tableProperties.get("bucketColumnNames").split(",").map(_.trim).toSeq
- val sortColumnNames: Seq[String] = if
(tableProperties.containsKey("orderByKey")) {
- tableProperties.get("orderByKey").split(",").map(_.trim).toSeq
- } else Seq.empty[String]
+
getCommaSeparatedColumns("bucketColumnNames").getOrElse(Seq.empty[String])
+ val sortColumnNames: Seq[String] =
+ getCommaSeparatedColumns("orderByKey").getOrElse(Seq.empty[String])
Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames))
} else {
None
@@ -79,7 +79,11 @@ trait ClickHouseTableV2Base {
val tableProperties = deltaProperties
if (tableProperties.containsKey(keyName)) {
if (tableProperties.get(keyName).nonEmpty) {
- val keys = tableProperties.get(keyName).split(",").map(_.trim).toSeq
+ val keys = tableProperties
+ .get(keyName)
+ .split(",")
+ .map(n => ConverterUtils.normalizeColName(n.trim))
+ .toSeq
keys.foreach(
s => {
if (s.contains(".")) {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
index 8599b3002..fc686056f 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.utils.UTSystemParameters
import org.apache.spark.SparkConf
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
@@ -112,6 +112,10 @@ class GlutenClickHouseHiveTableSuite
.set("spark.hive.exec.dynamic.partition.mode", "nonstrict")
.set("spark.gluten.supported.hive.udfs", "my_add")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format",
"true")
+ .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
+ .set(
+ "spark.sql.catalog.spark_catalog",
+
"org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseSparkCatalog")
.setMaster("local[*]")
}
@@ -1281,4 +1285,22 @@ class GlutenClickHouseHiveTableSuite
compareResultsAgainstVanillaSpark(select_sql, true, { _ => })
sql(s"drop table if exists $tbl")
}
+
+ test("test mergetree write with column case sensitive on hive") {
+ val dataPath = s"$basePath/lineitem_mergetree_bucket"
+ val sourceDF = spark.sql(s"""
+ |select
+ | string_field,
+ | int_field,
+ | long_field
+ | from $txt_user_define_input
+ |""".stripMargin)
+
+ sourceDF.write
+ .format("clickhouse")
+ .option("clickhouse.numBuckets", "1")
+ .option("clickhouse.bucketColumnNames", "STRING_FIELD")
+ .mode(SaveMode.Overwrite)
+ .save(dataPath)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]