This is an automated email from the ASF dual-hosted git repository. lwz9103 pushed a commit to branch liquid in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
commit 64df370d4ad673cd9b8076de163c2eaf9b1086d0 Author: lwz9103 <[email protected]> AuthorDate: Wed Apr 30 15:28:18 2025 +0800 Support liquid clustering --- .../v2/clickhouse/ClickHouseSparkCatalog.scala | 7 +- .../tpcds/GlutenClickHouseTPCDSLiquidSuite.scala | 221 +++++++++++++++++++++ .../execution/FileSourceScanExecTransformer.scala | 14 +- .../gluten/execution/ScanTransformerFactory.scala | 3 +- shims/spark35/pom.xml | 5 + .../sql/execution/AbstractFileSourceScanExec.scala | 3 +- .../sql/execution/FileSourceScanExecShim.scala | 30 ++- .../datasources/v2/utils/CatalogUtil.scala | 3 +- 8 files changed, 271 insertions(+), 15 deletions(-) diff --git a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseSparkCatalog.scala b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseSparkCatalog.scala index dde7013962..9c2fc355c6 100644 --- a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseSparkCatalog.scala +++ b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseSparkCatalog.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.sql.execution.datasources.v2.clickhouse -import org.apache.gluten.sql.shims.SparkShimLoader - import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier @@ -135,8 +133,7 @@ class ClickHouseSparkCatalog writeOptions: Map[String, String], sourceQuery: Option[DataFrame], operation: TableCreationModes.CreationMode): Table = { - val (partitionColumns, maybeBucketSpec) = - SparkShimLoader.getSparkShims.convertPartitionTransforms(partitions) + val (partitionColumns, maybeBucketSpec, clusterBySpec) = convertTransforms(partitions) var newSchema = schema var newPartitionColumns = partitionColumns var newBucketSpec = maybeBucketSpec @@ -176,7 +173,7 @@ class ClickHouseSparkCatalog comment = commentOpt ) - val withDb = verifyTableAndSolidify(tableDesc, None, isMergeTree = true) + val withDb = verifyTableAndSolidify(tableDesc, None, clusterBySpec, isMergeTree = true) val writer = sourceQuery.map { df => diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSLiquidSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSLiquidSuite.scala new file mode 100644 index 0000000000..71f90bbc9d --- /dev/null +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSLiquidSuite.scala @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution.tpcds + +import org.apache.gluten.execution.GlutenClickHouseTPCDSAbstractSuite + +import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper + +class GlutenClickHouseTPCDSLiquidSuite + extends GlutenClickHouseTPCDSAbstractSuite + with AdaptiveSparkPlanHelper { + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.sql.shuffle.partitions", "5") + .set("spark.driver.memory", "10G") + .set("spark.memory.offHeap.size", "4G") + .set("spark.sql.autoBroadcastJoinThreshold", "10MB") + .set("spark.sql.teststoreAssignmentPolicy", "legacy") + .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + } + + def spark35test(testName: String)(testFun: => Unit): Unit = { + if (isSparkVersionGE("3.5")) { + test(testName)(testFun) + } else { + ignore(testName)(testFun) + } + } + + spark35test("test delta parquet liquid tables") { + spark.sql("create database if not exists parquet") + spark.sql("use parquet") + spark.sql(""" + |CREATE TABLE IF NOT EXISTS store_sales + |( + | ss_sold_time_sk INT, + | ss_item_sk INT, + | ss_customer_sk INT, + | ss_cdemo_sk INT, + | ss_hdemo_sk INT, + | ss_addr_sk INT, + | ss_store_sk INT, + | ss_promo_sk INT, + | ss_ticket_number INT, + | ss_quantity INT, + | ss_wholesale_cost DECIMAL(7,2), + | ss_list_price DECIMAL(7,2), + | ss_sales_price DECIMAL(7,2), + | ss_ext_discount_amt DECIMAL(7,2), + | ss_ext_sales_price DECIMAL(7,2), + | ss_ext_wholesale_cost DECIMAL(7,2), + | ss_ext_list_price DECIMAL(7,2), + | ss_ext_tax DECIMAL(7,2), + | ss_coupon_amt DECIMAL(7,2), + | ss_net_paid DECIMAL(7,2), + | ss_net_paid_inc_tax DECIMAL(7,2), + | ss_net_profit DECIMAL(7,2), + | ss_sold_date_sk INT + |) + |USING DELTA + |CLUSTER BY (ss_sold_date_sk, ss_item_sk); + |""".stripMargin) + spark.sql(""" + |CREATE EXTERNAL TABLE IF NOT EXISTS date_dim + |( + | d_date_sk INT, + | d_date_id VARCHAR(16), + | d_date DATE, + | d_month_seq INT, + | d_week_seq INT, + | d_quarter_seq INT, + | d_year INT, + | d_dow INT, + | d_moy INT, + | d_dom INT, + | d_qoy INT, + | d_fy_year INT, + | d_fy_quarter_seq INT, + | d_fy_week_seq INT, + | d_day_name VARCHAR(9), + | d_quarter_name VARCHAR(6), + | d_holiday VARCHAR(1), + | d_weekend VARCHAR(1), + | d_following_holiday VARCHAR(1), + | d_first_dom INT, + | d_last_dom INT, + | d_same_day_ly INT, + | d_same_day_lq INT, + | d_current_day VARCHAR(1), + | d_current_week VARCHAR(1), + | d_current_month VARCHAR(1), + | d_current_quarter VARCHAR(1), + | d_current_year VARCHAR(1) + |) + |USING DELTA; + |""".stripMargin) + spark.sql("insert into parquet.store_sales select * from tpcdsdb.store_sales") + spark.sql("insert into parquet.date_dim select * from tpcdsdb.date_dim") + val sql = (db: String) => { + s""" + |SELECT + | avg(ss_quantity) agg1, + | avg(ss_list_price) agg2, + | avg(ss_coupon_amt) agg3, + | avg(ss_sales_price) agg4 + |FROM $db.store_sales join $db.date_dim on ss_sold_date_sk = d_date_sk + |where d_year = 2000 + |LIMIT 100 + |""".stripMargin + } + val originDf = spark.sql(sql("tpcdsdb")) + val liquidDf = spark.sql(sql("parquet")) + checkAnswer(originDf, liquidDf) + } + + spark35test("test mergetree liquid tables") { + spark.sql("create database if not exists mergetree") + spark.sql("use mergetree") + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS store_sales + |( + | ss_sold_time_sk INT, + | ss_item_sk INT, + | ss_customer_sk INT, + | ss_cdemo_sk INT, + | ss_hdemo_sk INT, + | ss_addr_sk INT, + | ss_store_sk INT, + | ss_promo_sk INT, + | ss_ticket_number INT, + | ss_quantity INT, + | ss_wholesale_cost DECIMAL(7,2), + | ss_list_price DECIMAL(7,2), + | ss_sales_price DECIMAL(7,2), + | ss_ext_discount_amt DECIMAL(7,2), + | ss_ext_sales_price DECIMAL(7,2), + | ss_ext_wholesale_cost DECIMAL(7,2), + | ss_ext_list_price DECIMAL(7,2), + | ss_ext_tax DECIMAL(7,2), + | ss_coupon_amt DECIMAL(7,2), + | ss_net_paid DECIMAL(7,2), + | ss_net_paid_inc_tax DECIMAL(7,2), + | ss_net_profit DECIMAL(7,2), + | ss_sold_date_sk INT + |) + |USING clickhouse + |CLUSTER BY (ss_sold_date_sk, ss_item_sk) + |LOCATION '$warehouse/mergetree.db/store_sales' + |""".stripMargin) + spark.sql(s""" + |CREATE EXTERNAL TABLE IF NOT EXISTS date_dim + |( + | d_date_sk INT, + | d_date_id VARCHAR(16), + | d_date DATE, + | d_month_seq INT, + | d_week_seq INT, + | d_quarter_seq INT, + | d_year INT, + | d_dow INT, + | d_moy INT, + | d_dom INT, + | d_qoy INT, + | d_fy_year INT, + | d_fy_quarter_seq INT, + | d_fy_week_seq INT, + | d_day_name VARCHAR(9), + | d_quarter_name VARCHAR(6), + | d_holiday VARCHAR(1), + | d_weekend VARCHAR(1), + | d_following_holiday VARCHAR(1), + | d_first_dom INT, + | d_last_dom INT, + | d_same_day_ly INT, + | d_same_day_lq INT, + | d_current_day VARCHAR(1), + | d_current_week VARCHAR(1), + | d_current_month VARCHAR(1), + | d_current_quarter VARCHAR(1), + | d_current_year VARCHAR(1) + |) + |USING clickhouse + |LOCATION '$warehouse/mergetree.db/date_dim' + |""".stripMargin) + spark.sql("insert into mergetree.store_sales select * from tpcdsdb.store_sales") + spark.sql("insert into mergetree.date_dim select * from tpcdsdb.date_dim") + val sql = (db: String) => { + s""" + |SELECT + | avg(ss_quantity) agg1, + | avg(ss_list_price) agg2, + | avg(ss_coupon_amt) agg3, + | avg(ss_sales_price) agg4 + |FROM $db.store_sales join $db.date_dim on ss_sold_date_sk = d_date_sk + |where d_year = 2000 + |LIMIT 100 + |""".stripMargin + } + val originDf = spark.sql(sql("tpcdsdb")) + val liquidDf = spark.sql(sql("mergetree")) + checkAnswer(originDf, liquidDf) + } + +} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala index 2716e4e1d3..864f404925 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala @@ -46,7 +46,8 @@ case class FileSourceScanExecTransformer( override val optionalNumCoalescedBuckets: Option[Int], override val dataFilters: Seq[Expression], override val tableIdentifier: Option[TableIdentifier], - override val disableBucketedScan: Boolean = false) + override val disableBucketedScan: Boolean = false, + override val clusterKeyFilters: Seq[Expression] = Seq.empty) extends FileSourceScanExecTransformerBase( relation, output, @@ -56,7 +57,9 @@ case class FileSourceScanExecTransformer( optionalNumCoalescedBuckets, dataFilters, tableIdentifier, - disableBucketedScan) { + disableBucketedScan, + clusterKeyFilters + ) { override def doCanonicalize(): FileSourceScanExecTransformer = { FileSourceScanExecTransformer( @@ -84,7 +87,8 @@ abstract class FileSourceScanExecTransformerBase( optionalNumCoalescedBuckets: Option[Int], dataFilters: Seq[Expression], tableIdentifier: Option[TableIdentifier], - disableBucketedScan: Boolean = false) + disableBucketedScan: Boolean = false, + clusterKeyFilters: Seq[Expression] = Seq.empty) extends FileSourceScanExecShim( relation, output, @@ -94,7 +98,9 @@ abstract class FileSourceScanExecTransformerBase( optionalNumCoalescedBuckets, dataFilters, tableIdentifier, - disableBucketedScan) + disableBucketedScan, + clusterKeyFilters + ) with DatasourceScanTransformer { // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala index 2a8cc91382..6348ea5697 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala @@ -34,7 +34,8 @@ object ScanTransformerFactory { scanExec.optionalNumCoalescedBuckets, scanExec.dataFilters, scanExec.tableIdentifier, - scanExec.disableBucketedScan + scanExec.disableBucketedScan, + scanExec.clusterKeyFilters ) } diff --git a/shims/spark35/pom.xml b/shims/spark35/pom.xml index e915fe1b21..94d963f575 100644 --- a/shims/spark35/pom.xml +++ b/shims/spark35/pom.xml @@ -53,6 +53,11 @@ <scope>provided</scope> <optional>true</optional> </dependency> + <dependency> + <groupId>io.delta</groupId> + <artifactId>${delta.package.name}_${scala.binary.version}</artifactId> + <scope>provided</scope> + </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> diff --git a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala index 6cca913dd7..b01af65c41 100644 --- a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala +++ b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala @@ -64,7 +64,8 @@ abstract class AbstractFileSourceScanExec( override val optionalNumCoalescedBuckets: Option[Int], override val dataFilters: Seq[Expression], override val tableIdentifier: Option[TableIdentifier], - override val disableBucketedScan: Boolean = false) + override val disableBucketedScan: Boolean = false, + override val clusterKeyFilters: Seq[Expression] = Seq.empty) extends FileSourceScanLike { override def supportsColumnar: Boolean = { diff --git a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala index 463afbbca4..8c91800cbf 100644 --- a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala +++ b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala @@ -21,7 +21,10 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, Expression, FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, PlanExpression, Predicate} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, DynamicPruningExpression, Expression, FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, PlanExpression, Predicate} +import org.apache.spark.sql.delta.Snapshot +import org.apache.spark.sql.delta.files.TahoeLogFileIndex +import org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.execution.metric.SQLMetric @@ -37,7 +40,8 @@ abstract class FileSourceScanExecShim( optionalNumCoalescedBuckets: Option[Int], dataFilters: Seq[Expression], tableIdentifier: Option[TableIdentifier], - disableBucketedScan: Boolean = false) + disableBucketedScan: Boolean = false, + clusterKeyFilters: Seq[Expression] = Seq.empty) extends AbstractFileSourceScanExec( relation, output, @@ -47,7 +51,9 @@ abstract class FileSourceScanExecShim( optionalNumCoalescedBuckets, dataFilters, tableIdentifier, - disableBucketedScan) { + disableBucketedScan, + clusterKeyFilters + ) { // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. @transient override lazy val metrics: Map[String, SQLMetric] = Map() @@ -116,6 +122,24 @@ abstract class FileSourceScanExecShim( setFilesNumAndSizeMetric(ret, static = false) ret }(t => driverMetrics("pruningTime").set(t)) + } else if (clusterKeyFilters.nonEmpty) { + val snapshot: Snapshot = relation.location match { + case index: PreparedDeltaFileIndex => index.preparedScan.scannedSnapshot + case index: TahoeLogFileIndex => index.getSnapshot + case _ => null + } + val filters = clusterKeyFilters.map { + case d: DynamicPruningExpression => d.child + case f => f + } + val filteredFiles = snapshot.filesForScan(filters, keepNumRecords = false).files + val pds: Array[PartitionDirectory] = selectedPartitions + assert(pds.length == 1) + val selectedFiles = pds.head.files + .filter(fp => filteredFiles.map(x => x.path).exists(fp.getPath.toString.contains(_))) + val returnedFiles = Array(PartitionDirectory(InternalRow.empty, selectedFiles)) + setFilesNumAndSizeMetric(returnedFiles, static = false) + returnedFiles } else { selectedPartitions } diff --git a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala index 37ae5984e3..4a521a9832 100644 --- a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala +++ b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/utils/CatalogUtil.scala @@ -23,6 +23,7 @@ object CatalogUtil { def convertPartitionTransforms(partitions: Seq[Transform]): (Seq[String], Option[BucketSpec]) = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TransformHelper - partitions.toSeq.convertTransforms + val (partCols, bucketSpec, clusterBySpec) = partitions.toSeq.convertTransforms + (partCols, bucketSpec) } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
