This is an automated email from the ASF dual-hosted git repository.

lwz9103 pushed a commit to branch liquid
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git

commit 64df370d4ad673cd9b8076de163c2eaf9b1086d0
Author: lwz9103 <[email protected]>
AuthorDate: Wed Apr 30 15:28:18 2025 +0800

    Support liquid clustering
---
 .../v2/clickhouse/ClickHouseSparkCatalog.scala     |   7 +-
 .../tpcds/GlutenClickHouseTPCDSLiquidSuite.scala   | 221 +++++++++++++++++++++
 .../execution/FileSourceScanExecTransformer.scala  |  14 +-
 .../gluten/execution/ScanTransformerFactory.scala  |   3 +-
 shims/spark35/pom.xml                              |   5 +
 .../sql/execution/AbstractFileSourceScanExec.scala |   3 +-
 .../sql/execution/FileSourceScanExecShim.scala     |  30 ++-
 .../datasources/v2/utils/CatalogUtil.scala         |   3 +-
 8 files changed, 271 insertions(+), 15 deletions(-)

diff --git 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseSparkCatalog.scala
 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseSparkCatalog.scala
index dde7013962..9c2fc355c6 100644
--- 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseSparkCatalog.scala
+++ 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseSparkCatalog.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.spark.sql.execution.datasources.v2.clickhouse
 
-import org.apache.gluten.sql.shims.SparkShimLoader
-
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -135,8 +133,7 @@ class ClickHouseSparkCatalog
       writeOptions: Map[String, String],
       sourceQuery: Option[DataFrame],
       operation: TableCreationModes.CreationMode): Table = {
-    val (partitionColumns, maybeBucketSpec) =
-      SparkShimLoader.getSparkShims.convertPartitionTransforms(partitions)
+    val (partitionColumns, maybeBucketSpec, clusterBySpec) = 
convertTransforms(partitions)
     var newSchema = schema
     var newPartitionColumns = partitionColumns
     var newBucketSpec = maybeBucketSpec
@@ -176,7 +173,7 @@ class ClickHouseSparkCatalog
       comment = commentOpt
     )
 
-    val withDb = verifyTableAndSolidify(tableDesc, None, isMergeTree = true)
+    val withDb = verifyTableAndSolidify(tableDesc, None, clusterBySpec, 
isMergeTree = true)
 
     val writer = sourceQuery.map {
       df =>
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSLiquidSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSLiquidSuite.scala
new file mode 100644
index 0000000000..71f90bbc9d
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSLiquidSuite.scala
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.tpcds
+
+import org.apache.gluten.execution.GlutenClickHouseTPCDSAbstractSuite
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+class GlutenClickHouseTPCDSLiquidSuite
+  extends GlutenClickHouseTPCDSAbstractSuite
+  with AdaptiveSparkPlanHelper {
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+      .set("spark.sql.shuffle.partitions", "5")
+      .set("spark.driver.memory", "10G")
+      .set("spark.memory.offHeap.size", "4G")
+      .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+      .set("spark.sql.teststoreAssignmentPolicy", "legacy")
+      .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
+  }
+
+  def spark35test(testName: String)(testFun: => Unit): Unit = {
+    if (isSparkVersionGE("3.5")) {
+      test(testName)(testFun)
+    } else {
+      ignore(testName)(testFun)
+    }
+  }
+
+  spark35test("test delta parquet liquid tables") {
+    spark.sql("create database if not exists parquet")
+    spark.sql("use parquet")
+    spark.sql("""
+                |CREATE TABLE IF NOT EXISTS store_sales
+                |(
+                |     ss_sold_time_sk INT,
+                |     ss_item_sk INT,
+                |     ss_customer_sk INT,
+                |     ss_cdemo_sk INT,
+                |     ss_hdemo_sk INT,
+                |     ss_addr_sk INT,
+                |     ss_store_sk INT,
+                |     ss_promo_sk INT,
+                |     ss_ticket_number INT,
+                |     ss_quantity INT,
+                |     ss_wholesale_cost DECIMAL(7,2),
+                |     ss_list_price DECIMAL(7,2),
+                |     ss_sales_price DECIMAL(7,2),
+                |     ss_ext_discount_amt DECIMAL(7,2),
+                |     ss_ext_sales_price DECIMAL(7,2),
+                |     ss_ext_wholesale_cost DECIMAL(7,2),
+                |     ss_ext_list_price DECIMAL(7,2),
+                |     ss_ext_tax DECIMAL(7,2),
+                |     ss_coupon_amt DECIMAL(7,2),
+                |     ss_net_paid DECIMAL(7,2),
+                |     ss_net_paid_inc_tax DECIMAL(7,2),
+                |     ss_net_profit DECIMAL(7,2),
+                |     ss_sold_date_sk INT
+                |)
+                |USING DELTA
+                |CLUSTER BY (ss_sold_date_sk, ss_item_sk);
+                |""".stripMargin)
+    spark.sql("""
+                |CREATE EXTERNAL TABLE IF NOT EXISTS date_dim
+                |(
+                |     d_date_sk INT,
+                |     d_date_id VARCHAR(16),
+                |     d_date DATE,
+                |     d_month_seq INT,
+                |     d_week_seq INT,
+                |     d_quarter_seq INT,
+                |     d_year INT,
+                |     d_dow INT,
+                |     d_moy INT,
+                |     d_dom INT,
+                |     d_qoy INT,
+                |     d_fy_year INT,
+                |     d_fy_quarter_seq INT,
+                |     d_fy_week_seq INT,
+                |     d_day_name VARCHAR(9),
+                |     d_quarter_name VARCHAR(6),
+                |     d_holiday VARCHAR(1),
+                |     d_weekend VARCHAR(1),
+                |     d_following_holiday VARCHAR(1),
+                |     d_first_dom INT,
+                |     d_last_dom INT,
+                |     d_same_day_ly INT,
+                |     d_same_day_lq INT,
+                |     d_current_day VARCHAR(1),
+                |     d_current_week VARCHAR(1),
+                |     d_current_month VARCHAR(1),
+                |     d_current_quarter VARCHAR(1),
+                |     d_current_year VARCHAR(1)
+                |)
+                |USING DELTA;
+                |""".stripMargin)
+    spark.sql("insert into parquet.store_sales select * from 
tpcdsdb.store_sales")
+    spark.sql("insert into parquet.date_dim select * from tpcdsdb.date_dim")
+    val sql = (db: String) => {
+      s"""
+         |SELECT
+         |  avg(ss_quantity) agg1,
+         |  avg(ss_list_price) agg2,
+         |  avg(ss_coupon_amt) agg3,
+         |  avg(ss_sales_price) agg4
+         |FROM $db.store_sales join $db.date_dim on ss_sold_date_sk = d_date_sk
+         |where d_year = 2000
+         |LIMIT 100
+         |""".stripMargin
+    }
+    val originDf = spark.sql(sql("tpcdsdb"))
+    val liquidDf = spark.sql(sql("parquet"))
+    checkAnswer(originDf, liquidDf)
+  }
+
+  spark35test("test mergetree liquid tables") {
+    spark.sql("create database if not exists mergetree")
+    spark.sql("use mergetree")
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS store_sales
+                 |(
+                 |     ss_sold_time_sk INT,
+                 |     ss_item_sk INT,
+                 |     ss_customer_sk INT,
+                 |     ss_cdemo_sk INT,
+                 |     ss_hdemo_sk INT,
+                 |     ss_addr_sk INT,
+                 |     ss_store_sk INT,
+                 |     ss_promo_sk INT,
+                 |     ss_ticket_number INT,
+                 |     ss_quantity INT,
+                 |     ss_wholesale_cost DECIMAL(7,2),
+                 |     ss_list_price DECIMAL(7,2),
+                 |     ss_sales_price DECIMAL(7,2),
+                 |     ss_ext_discount_amt DECIMAL(7,2),
+                 |     ss_ext_sales_price DECIMAL(7,2),
+                 |     ss_ext_wholesale_cost DECIMAL(7,2),
+                 |     ss_ext_list_price DECIMAL(7,2),
+                 |     ss_ext_tax DECIMAL(7,2),
+                 |     ss_coupon_amt DECIMAL(7,2),
+                 |     ss_net_paid DECIMAL(7,2),
+                 |     ss_net_paid_inc_tax DECIMAL(7,2),
+                 |     ss_net_profit DECIMAL(7,2),
+                 |     ss_sold_date_sk INT
+                 |)
+                 |USING clickhouse
+                 |CLUSTER BY (ss_sold_date_sk, ss_item_sk)
+                 |LOCATION '$warehouse/mergetree.db/store_sales'
+                 |""".stripMargin)
+    spark.sql(s"""
+                 |CREATE EXTERNAL TABLE IF NOT EXISTS date_dim
+                 |(
+                 |     d_date_sk INT,
+                 |     d_date_id VARCHAR(16),
+                 |     d_date DATE,
+                 |     d_month_seq INT,
+                 |     d_week_seq INT,
+                 |     d_quarter_seq INT,
+                 |     d_year INT,
+                 |     d_dow INT,
+                 |     d_moy INT,
+                 |     d_dom INT,
+                 |     d_qoy INT,
+                 |     d_fy_year INT,
+                 |     d_fy_quarter_seq INT,
+                 |     d_fy_week_seq INT,
+                 |     d_day_name VARCHAR(9),
+                 |     d_quarter_name VARCHAR(6),
+                 |     d_holiday VARCHAR(1),
+                 |     d_weekend VARCHAR(1),
+                 |     d_following_holiday VARCHAR(1),
+                 |     d_first_dom INT,
+                 |     d_last_dom INT,
+                 |     d_same_day_ly INT,
+                 |     d_same_day_lq INT,
+                 |     d_current_day VARCHAR(1),
+                 |     d_current_week VARCHAR(1),
+                 |     d_current_month VARCHAR(1),
+                 |     d_current_quarter VARCHAR(1),
+                 |     d_current_year VARCHAR(1)
+                 |)
+                 |USING clickhouse
+                 |LOCATION '$warehouse/mergetree.db/date_dim'
+                 |""".stripMargin)
+    spark.sql("insert into mergetree.store_sales select * from 
tpcdsdb.store_sales")
+    spark.sql("insert into mergetree.date_dim select * from tpcdsdb.date_dim")
+    val sql = (db: String) => {
+      s"""
+         |SELECT
+         |  avg(ss_quantity) agg1,
+         |  avg(ss_list_price) agg2,
+         |  avg(ss_coupon_amt) agg3,
+         |  avg(ss_sales_price) agg4
+         |FROM $db.store_sales join $db.date_dim on ss_sold_date_sk = d_date_sk
+         |where d_year = 2000
+         |LIMIT 100
+         |""".stripMargin
+    }
+    val originDf = spark.sql(sql("tpcdsdb"))
+    val liquidDf = spark.sql(sql("mergetree"))
+    checkAnswer(originDf, liquidDf)
+  }
+
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index 2716e4e1d3..864f404925 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -46,7 +46,8 @@ case class FileSourceScanExecTransformer(
     override val optionalNumCoalescedBuckets: Option[Int],
     override val dataFilters: Seq[Expression],
     override val tableIdentifier: Option[TableIdentifier],
-    override val disableBucketedScan: Boolean = false)
+    override val disableBucketedScan: Boolean = false,
+    override val clusterKeyFilters: Seq[Expression] = Seq.empty)
   extends FileSourceScanExecTransformerBase(
     relation,
     output,
@@ -56,7 +57,9 @@ case class FileSourceScanExecTransformer(
     optionalNumCoalescedBuckets,
     dataFilters,
     tableIdentifier,
-    disableBucketedScan) {
+    disableBucketedScan,
+    clusterKeyFilters
+  ) {
 
   override def doCanonicalize(): FileSourceScanExecTransformer = {
     FileSourceScanExecTransformer(
@@ -84,7 +87,8 @@ abstract class FileSourceScanExecTransformerBase(
     optionalNumCoalescedBuckets: Option[Int],
     dataFilters: Seq[Expression],
     tableIdentifier: Option[TableIdentifier],
-    disableBucketedScan: Boolean = false)
+    disableBucketedScan: Boolean = false,
+    clusterKeyFilters: Seq[Expression] = Seq.empty)
   extends FileSourceScanExecShim(
     relation,
     output,
@@ -94,7 +98,9 @@ abstract class FileSourceScanExecTransformerBase(
     optionalNumCoalescedBuckets,
     dataFilters,
     tableIdentifier,
-    disableBucketedScan)
+    disableBucketedScan,
+    clusterKeyFilters
+  )
   with DatasourceScanTransformer {
 
   // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
index 2a8cc91382..6348ea5697 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
@@ -34,7 +34,8 @@ object ScanTransformerFactory {
       scanExec.optionalNumCoalescedBuckets,
       scanExec.dataFilters,
       scanExec.tableIdentifier,
-      scanExec.disableBucketedScan
+      scanExec.disableBucketedScan,
+      scanExec.clusterKeyFilters
     )
   }
 
diff --git a/shims/spark35/pom.xml b/shims/spark35/pom.xml
index e915fe1b21..94d963f575 100644
--- a/shims/spark35/pom.xml
+++ b/shims/spark35/pom.xml
@@ -53,6 +53,11 @@
       <scope>provided</scope>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>io.delta</groupId>
+      <artifactId>${delta.package.name}_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
index 6cca913dd7..b01af65c41 100644
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -64,7 +64,8 @@ abstract class AbstractFileSourceScanExec(
     override val optionalNumCoalescedBuckets: Option[Int],
     override val dataFilters: Seq[Expression],
     override val tableIdentifier: Option[TableIdentifier],
-    override val disableBucketedScan: Boolean = false)
+    override val disableBucketedScan: Boolean = false,
+    override val clusterKeyFilters: Seq[Expression] = Seq.empty)
   extends FileSourceScanLike {
 
   override def supportsColumnar: Boolean = {
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 463afbbca4..8c91800cbf 100644
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -21,7 +21,10 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeReference, BoundReference, Expression, 
FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, 
PlanExpression, Predicate}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
AttributeReference, BoundReference, DynamicPruningExpression, Expression, 
FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, 
PlanExpression, Predicate}
+import org.apache.spark.sql.delta.Snapshot
+import org.apache.spark.sql.delta.files.TahoeLogFileIndex
+import org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
PartitionDirectory}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
 import org.apache.spark.sql.execution.metric.SQLMetric
@@ -37,7 +40,8 @@ abstract class FileSourceScanExecShim(
     optionalNumCoalescedBuckets: Option[Int],
     dataFilters: Seq[Expression],
     tableIdentifier: Option[TableIdentifier],
-    disableBucketedScan: Boolean = false)
+    disableBucketedScan: Boolean = false,
+    clusterKeyFilters: Seq[Expression] = Seq.empty)
   extends AbstractFileSourceScanExec(
     relation,
     output,
@@ -47,7 +51,9 @@ abstract class FileSourceScanExecShim(
     optionalNumCoalescedBuckets,
     dataFilters,
     tableIdentifier,
-    disableBucketedScan) {
+    disableBucketedScan,
+    clusterKeyFilters
+  ) {
 
   // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
   @transient override lazy val metrics: Map[String, SQLMetric] = Map()
@@ -116,6 +122,24 @@ abstract class FileSourceScanExecShim(
         setFilesNumAndSizeMetric(ret, static = false)
         ret
       }(t => driverMetrics("pruningTime").set(t))
+    } else if (clusterKeyFilters.nonEmpty) {
+      val snapshot: Snapshot = relation.location match {
+        case index: PreparedDeltaFileIndex => 
index.preparedScan.scannedSnapshot
+        case index: TahoeLogFileIndex => index.getSnapshot
+        case _ => null
+      }
+      val filters = clusterKeyFilters.map {
+        case d: DynamicPruningExpression => d.child
+        case f => f
+      }
+      val filteredFiles = snapshot.filesForScan(filters, keepNumRecords = 
false).files
+      val pds: Array[PartitionDirectory] = selectedPartitions
+      assert(pds.length == 1)
+      val selectedFiles = pds.head.files
+        .filter(fp => filteredFiles.map(x => 
x.path).exists(fp.getPath.toString.contains(_)))
+      val returnedFiles = Array(PartitionDirectory(InternalRow.empty, 
selectedFiles))
+      setFilesNumAndSizeMetric(returnedFiles, static = false)
+      returnedFiles
     } else {
       selectedPartitions
     }
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala
index 37ae5984e3..4a521a9832 100644
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala
@@ -23,6 +23,7 @@ object CatalogUtil {
 
   def convertPartitionTransforms(partitions: Seq[Transform]): (Seq[String], 
Option[BucketSpec]) = {
     import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper
-    partitions.toSeq.convertTransforms
+    val (partCols, bucketSpec, clusterBySpec) = 
partitions.toSeq.convertTransforms
+    (partCols, bucketSpec)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to