This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ddf28c097b [GLUTEN-7812][CH] Fix the query failed for the mergetree
format when the 'spark.databricks.delta.stats.skipping' is off (#7813)
ddf28c097b is described below
commit ddf28c097b0dfe2fbba7c2a4fdbfee5323bd833d
Author: Zhichao Zhang <[email protected]>
AuthorDate: Wed Nov 6 07:13:51 2024 +0800
[GLUTEN-7812][CH] Fix the query failed for the mergetree format when the
'spark.databricks.delta.stats.skipping' is off (#7813)
For the Spark 3.3 + Delta 2.3, when the
'spark.databricks.delta.stats.skipping' is off, there are some queries failed
with the subquery, the error message is below:
```
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:268)
at
org.apache.spark.sql.delta.SubqueryTransformerHelper.transformWithSubqueries(SubqueryTransformerHelper.scala:42)
at
org.apache.spark.sql.delta.SubqueryTransformerHelper.transformWithSubqueries$(SubqueryTransformerHelper.scala:40)
at
org.apache.spark.sql.delta.stats.PrepareDeltaScan.transformWithSubqueries(PrepareDeltaScan.scala:291)
at
org.apache.spark.sql.delta.PreprocessTableWithDVs.preprocessTablesWithDVs(PreprocessTableWithDVs.scala:67)
at
org.apache.spark.sql.delta.PreprocessTableWithDVs.preprocessTablesWithDVs$(PreprocessTableWithDVs.scala:66)
at
org.apache.spark.sql.delta.stats.PrepareDeltaScan.preprocessTablesWithDVs(PrepareDeltaScan.scala:291)
at
org.apache.spark.sql.delta.stats.PrepareDeltaScanBase.apply(PrepareDeltaScan.scala:227)
at
org.apache.spark.sql.delta.stats.PrepareDeltaScanBase.apply$(PrepareDeltaScan.scala:191)
at
org.apache.spark.sql.delta.stats.PrepareDeltaScan.apply(PrepareDeltaScan.scala:291)
at
org.apache.spark.sql.delta.stats.PrepareDeltaScan.apply(PrepareDeltaScan.scala:291)
```
Close #7812.
---
backends-clickhouse/pom.xml | 1 +
.../rules/CHOptimizeMetadataOnlyDeltaQuery.scala | 30 ++
.../rules/CHOptimizeMetadataOnlyDeltaQuery.scala | 77 ++++
.../spark/sql/delta/stats/PrepareDeltaScan.scala | 406 +++++++++++++++++++++
.../delta-32/io/delta/tables/ClickhouseTable.scala | 7 +-
.../delta/ClickhouseOptimisticTransaction.scala | 14 +-
.../org/apache/spark/sql/delta/DeltaLog.scala | 2 +-
.../org/apache/spark/sql/delta/Snapshot.scala | 2 +-
.../sql/delta/catalog/ClickHouseTableV2.scala | 8 -
.../spark/sql/delta/commands/DeleteCommand.scala | 2 +-
.../sql/delta/commands/OptimizeTableCommand.scala | 2 +-
.../spark/sql/delta/commands/UpdateCommand.scala | 2 +-
.../spark/sql/delta/commands/VacuumCommand.scala | 5 +-
.../commands/merge/ClassicMergeExecutor.scala | 2 +-
.../rules/CHOptimizeMetadataOnlyDeltaQuery.scala | 77 ++++
.../gluten/backendsapi/clickhouse/CHBackend.scala | 4 +
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 2 +
.../GlutenClickHouseMergeTreeWriteSuite.scala | 119 +++++-
.../gluten/extension/injector/SparkInjector.scala | 4 +
19 files changed, 722 insertions(+), 44 deletions(-)
diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index b00470f9ea..0f593a861b 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -368,6 +368,7 @@
<excludes>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/*.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/merge/*.scala</exclude>
+
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/stats/*.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/DeltaLog.scala</exclude>
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/Snapshot.scala</exclude>
</excludes>
diff --git
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
new file mode 100644
index 0000000000..a360fa8d72
--- /dev/null
+++
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.rules
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.delta.metering.DeltaLogging
+
+class CHOptimizeMetadataOnlyDeltaQuery(protected val spark: SparkSession)
+ extends Rule[LogicalPlan]
+ with DeltaLogging {
+
+ // For Delta 2.0, it can not support to optimize query with the metadata
+ override def apply(plan: LogicalPlan): LogicalPlan = plan
+}
diff --git
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
new file mode 100644
index 0000000000..dbb5c4050a
--- /dev/null
+++
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.rules
+
+import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
V2WriteCommand}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.delta.{OptimisticTransaction, Snapshot,
SubqueryTransformerHelper}
+import org.apache.spark.sql.delta.files.TahoeLogFileIndex
+import org.apache.spark.sql.delta.metering.DeltaLogging
+import org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.stats.DeltaScanGenerator
+
+import org.apache.hadoop.fs.Path
+
+class CHOptimizeMetadataOnlyDeltaQuery(protected val spark: SparkSession)
+ extends Rule[LogicalPlan]
+ with DeltaLogging
+ with SubqueryTransformerHelper
+ with OptimizeMetadataOnlyDeltaQuery {
+
+ private val scannedSnapshots =
+ new java.util.concurrent.ConcurrentHashMap[(String, Path), Snapshot]
+
+ protected def getDeltaScanGenerator(index: TahoeLogFileIndex):
DeltaScanGenerator = {
+ // The first case means that we've fixed the table snapshot for time travel
+ if (index.isTimeTravelQuery) return index.getSnapshot
+ OptimisticTransaction
+ .getActive()
+ .map(_.getDeltaScanGenerator(index))
+ .getOrElse {
+ // Will be called only when the log is accessed the first time
+ scannedSnapshots.computeIfAbsent(index.deltaLog.compositeId, _ =>
index.getSnapshot)
+ }
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ // Should not be applied to subqueries to avoid duplicate delta jobs.
+ val isSubquery = isSubqueryRoot(plan)
+ // Should not be applied to DataSourceV2 write plans, because they'll be
planned later
+ // through a V1 fallback and only that later planning takes place within
the transaction.
+ val isDataSourceV2 = plan.isInstanceOf[V2WriteCommand]
+ if (isSubquery || isDataSourceV2) {
+ return plan
+ }
+ // when 'stats.skipping' is off, it still use the metadata to optimize
query for count/min/max
+ if (
+ spark.sessionState.conf
+ .getConfString(
+ CHBackendSettings.GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE,
+
CHBackendSettings.GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE_DEFAULT_VALUE)
+ .toBoolean &&
+ !spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING, true)
+ ) {
+ optimizeQueryWithMetadata(plan)
+ } else {
+ plan
+ }
+ }
+}
diff --git
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala
new file mode 100644
index 0000000000..21e31d3541
--- /dev/null
+++
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.delta.stats
+
+import java.util.Objects
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.delta._
+import org.apache.spark.sql.delta.actions.AddFile
+import org.apache.spark.sql.delta.files.{TahoeFileIndexWithSnapshot,
TahoeLogFileIndex}
+import org.apache.spark.sql.delta.metering.DeltaLogging
+import org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.PROJECT
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+/**
+ * Gluten overwrite Delta:
+ *
+ * This file is copied from Delta 2.3.0, it is modified to overcome the
following issues:
+ * 1. Returns the plan directly even if stats.skipping is turned off
+ */
+
+/**
+ * Before query planning, we prepare any scans over delta tables by pushing
+ * any projections or filters in allowing us to gather more accurate statistics
+ * for CBO and metering.
+ *
+ * Note the following
+ * - This rule also ensures that all reads from the same delta log use the
same snapshot of log
+ * thus providing snapshot isolation.
+ * - If this rule is invoked within an active [[OptimisticTransaction]], then
the scans are
+ * generated using the transaction.
+ */
+trait PrepareDeltaScanBase extends Rule[LogicalPlan]
+ with PredicateHelper
+ with DeltaLogging
+ with OptimizeMetadataOnlyDeltaQuery
+ with PreprocessTableWithDVs { self: PrepareDeltaScan =>
+
+ /**
+ * Tracks the first-access snapshots of other logs planned by this rule. The
snapshots are
+ * the keyed by the log's unique id. Note that the lifetime of this rule is
a single
+ * query, therefore, the map tracks the snapshots only within a query.
+ */
+ private val scannedSnapshots =
+ new java.util.concurrent.ConcurrentHashMap[(String, Path), Snapshot]
+
+ /**
+ * Gets the [[DeltaScanGenerator]] for the given log, which will be used to
generate
+ * [[DeltaScan]]s. Every time this method is called on a log within the
lifetime of this
+ * rule (i.e., the lifetime of the query for which this rule was
instantiated), the returned
+ * generator will read a snapshot that is pinned on the first access for
that log.
+ *
+ * Internally, it will use the snapshot of the file index, the snapshot of
the active transaction
+ * (if any), or the latest snapshot of the given log.
+ */
+ protected def getDeltaScanGenerator(index: TahoeLogFileIndex):
DeltaScanGenerator = {
+ // The first case means that we've fixed the table snapshot for time travel
+ if (index.isTimeTravelQuery) return index.getSnapshot
+ val scanGenerator = OptimisticTransaction.getActive()
+ .map(_.getDeltaScanGenerator(index))
+ .getOrElse {
+ // Will be called only when the log is accessed the first time
+ scannedSnapshots.computeIfAbsent(index.deltaLog.compositeId, _ =>
index.getSnapshot)
+ }
+ import PrepareDeltaScanBase._
+ if (onGetDeltaScanGeneratorCallback != null)
onGetDeltaScanGeneratorCallback(scanGenerator)
+ scanGenerator
+ }
+
+ /**
+ * Helper method to generate a [[PreparedDeltaFileIndex]]
+ */
+ protected def getPreparedIndex(
+ preparedScan: DeltaScan,
+ fileIndex: TahoeLogFileIndex): PreparedDeltaFileIndex = {
+ assert(fileIndex.partitionFilters.isEmpty,
+ "Partition filters should have been extracted by DeltaAnalysis.")
+ PreparedDeltaFileIndex(
+ spark,
+ fileIndex.deltaLog,
+ fileIndex.path,
+ preparedScan,
+ fileIndex.versionToUse)
+ }
+
+ /**
+ * Scan files using the given `filters` and return `DeltaScan`.
+ *
+ * Note: when `limitOpt` is non empty, `filters` must contain only partition
filters. Otherwise,
+ * it can contain arbitrary filters. See `DeltaTableScan` for more details.
+ */
+ protected def filesForScan(
+ scanGenerator: DeltaScanGenerator,
+ limitOpt: Option[Int],
+ filters: Seq[Expression],
+ delta: LogicalRelation): DeltaScan = {
+ withStatusCode("DELTA", "Filtering files for query") {
+ if (limitOpt.nonEmpty) {
+ // If we trigger limit push down, the filters must be partition
filters. Since
+ // there are no data filters, we don't need to apply Generated Columns
+ // optimization. See `DeltaTableScan` for more details.
+ return scanGenerator.filesForScan(limitOpt.get, filters)
+ }
+ val filtersForScan =
+ if (!GeneratedColumn.partitionFilterOptimizationEnabled(spark)) {
+ filters
+ } else {
+ val generatedPartitionFilters =
GeneratedColumn.generatePartitionFilters(
+ spark, scanGenerator.snapshotToScan, filters, delta)
+ filters ++ generatedPartitionFilters
+ }
+ scanGenerator.filesForScan(filtersForScan)
+ }
+ }
+
+ /**
+ * Prepares delta scans sequentially.
+ */
+ protected def prepareDeltaScan(plan: LogicalPlan): LogicalPlan = {
+ // A map from the canonicalized form of a DeltaTableScan operator to its
corresponding delta
+ // scan. This map is used to avoid fetching duplicate delta indexes for
structurally-equal
+ // delta scans.
+ val deltaScans = new mutable.HashMap[LogicalPlan, DeltaScan]()
+
+ transformWithSubqueries(plan) {
+ case scan @ DeltaTableScan(planWithRemovedProjections, filters,
fileIndex,
+ limit, delta) =>
+ val scanGenerator = getDeltaScanGenerator(fileIndex)
+ val preparedScan =
deltaScans.getOrElseUpdate(planWithRemovedProjections.canonicalized,
+ filesForScan(scanGenerator, limit, filters, delta))
+ val preparedIndex = getPreparedIndex(preparedScan, fileIndex)
+ optimizeGeneratedColumns(scan, preparedIndex, filters, limit, delta)
+ }
+ }
+
+ protected def optimizeGeneratedColumns(
+ scan: LogicalPlan,
+ preparedIndex: PreparedDeltaFileIndex,
+ filters: Seq[Expression],
+ limit: Option[Int],
+ delta: LogicalRelation): LogicalPlan = {
+ if (limit.nonEmpty) {
+ // If we trigger limit push down, the filters must be partition filters.
Since
+ // there are no data filters, we don't need to apply Generated Columns
+ // optimization. See `DeltaTableScan` for more details.
+ return DeltaTableUtils.replaceFileIndex(scan, preparedIndex)
+ }
+ if (!GeneratedColumn.partitionFilterOptimizationEnabled(spark)) {
+ DeltaTableUtils.replaceFileIndex(scan, preparedIndex)
+ } else {
+ val generatedPartitionFilters =
+ GeneratedColumn.generatePartitionFilters(spark, preparedIndex,
filters, delta)
+ val scanWithFilters =
+ if (generatedPartitionFilters.nonEmpty) {
+ scan transformUp {
+ case delta @ DeltaTable(_: TahoeLogFileIndex) =>
+ Filter(generatedPartitionFilters.reduceLeft(And), delta)
+ }
+ } else {
+ scan
+ }
+ DeltaTableUtils.replaceFileIndex(scanWithFilters, preparedIndex)
+ }
+ }
+
+ override def apply(_plan: LogicalPlan): LogicalPlan = {
+ var plan = _plan
+
+ // --- modified start
+ // Should not be applied to subqueries to avoid duplicate delta jobs.
+ val isSubquery = isSubqueryRoot(plan)
+ // Should not be applied to DataSourceV2 write plans, because they'll be
planned later
+ // through a V1 fallback and only that later planning takes place within
the transaction.
+ val isDataSourceV2 = plan.isInstanceOf[V2WriteCommand]
+ if (isSubquery || isDataSourceV2) {
+ return plan
+ }
+
+ val shouldPrepareDeltaScan = (
+ spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING)
+ )
+ val updatedPlan = if (shouldPrepareDeltaScan) {
+ if
(spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED))
{
+ plan = optimizeQueryWithMetadata(plan)
+ }
+ prepareDeltaScan(plan)
+ } else {
+ // If this query is running inside an active transaction and is touching
the same table
+ // as the transaction, then mark that the entire table as tainted to be
safe.
+ OptimisticTransaction.getActive.foreach { txn =>
+ val logsInPlan = plan.collect { case DeltaTable(fileIndex) =>
fileIndex.deltaLog }
+ if (logsInPlan.exists(_.isSameLogAs(txn.deltaLog))) {
+ txn.readWholeTable()
+ }
+ }
+
+ // Just return the plan if statistics based skipping is off.
+ // It will fall back to just partition pruning at planning time.
+ plan
+ }
+ // --- modified end
+ preprocessTablesWithDVs(updatedPlan)
+ }
+
+ /**
+ * This is an extractor object. See
https://docs.scala-lang.org/tour/extractor-objects.html.
+ */
+ object DeltaTableScan {
+
+ /**
+ * The components of DeltaTableScanType are:
+ * - the plan with removed projections. We remove projections as a plan
differentiator
+ * because it does not affect file listing results.
+ * - filter expressions collected by `PhysicalOperation`
+ * - the `TahoeLogFileIndex` of the matched DeltaTable`
+ * - integer value of limit expression, if any
+ * - matched `DeltaTable`
+ */
+ private type DeltaTableScanType =
+ (LogicalPlan, Seq[Expression], TahoeLogFileIndex, Option[Int],
LogicalRelation)
+
+ /**
+ * This is an extractor method (basically, the opposite of a constructor)
which takes in an
+ * object `plan` and tries to give back the arguments as a
[[DeltaTableScanType]].
+ */
+ def unapply(plan: LogicalPlan): Option[DeltaTableScanType] = {
+ val limitPushdownEnabled =
spark.conf.get(DeltaSQLConf.DELTA_LIMIT_PUSHDOWN_ENABLED)
+
+ // Remove projections as a plan differentiator because it does not
affect file listing
+ // results. Plans with the same filters but different projections
therefore will not have
+ // duplicate delta indexes.
+ def canonicalizePlanForDeltaFileListing(plan: LogicalPlan): LogicalPlan
= {
+ val planWithRemovedProjections =
plan.transformWithPruning(_.containsPattern(PROJECT)) {
+ case p: Project if
p.projectList.forall(_.isInstanceOf[AttributeReference]) => p.child
+ }
+ planWithRemovedProjections
+ }
+
+ plan match {
+ case LocalLimit(IntegerLiteral(limit),
+ PhysicalOperation(_, filters, delta @ DeltaTable(fileIndex:
TahoeLogFileIndex)))
+ if limitPushdownEnabled && containsPartitionFiltersOnly(filters,
fileIndex) =>
+ Some((canonicalizePlanForDeltaFileListing(plan), filters, fileIndex,
Some(limit), delta))
+ case PhysicalOperation(
+ _,
+ filters,
+ delta @ DeltaTable(fileIndex: TahoeLogFileIndex)) =>
+ val allFilters = fileIndex.partitionFilters ++ filters
+ Some((canonicalizePlanForDeltaFileListing(plan), allFilters,
fileIndex, None, delta))
+
+ case _ => None
+ }
+ }
+
+ private def containsPartitionFiltersOnly(
+ filters: Seq[Expression],
+ fileIndex: TahoeLogFileIndex): Boolean = {
+ val partitionColumns =
fileIndex.snapshotAtAnalysis.metadata.partitionColumns
+ import DeltaTableUtils._
+ filters.forall(expr => !containsSubquery(expr) &&
+ isPredicatePartitionColumnsOnly(expr, partitionColumns, spark))
+ }
+ }
+}
+
+class PrepareDeltaScan(protected val spark: SparkSession)
+ extends PrepareDeltaScanBase
+
+object PrepareDeltaScanBase {
+
+ /**
+ * Optional callback function that is called after `getDeltaScanGenerator`
is called
+ * by the PrepareDeltaScan rule. This is primarily used for testing purposes.
+ */
+ @volatile private var onGetDeltaScanGeneratorCallback: DeltaScanGenerator =>
Unit = _
+
+ /**
+ * Run a thunk of code with the given callback function injected into the
PrepareDeltaScan rule.
+ * The callback function is called after `getDeltaScanGenerator` is called
+ * by the PrepareDeltaScan rule. This is primarily used for testing purposes.
+ */
+ private[delta] def withCallbackOnGetDeltaScanGenerator[T](
+ callback: DeltaScanGenerator => Unit)(thunk: => T): T = {
+ try {
+ onGetDeltaScanGeneratorCallback = callback
+ thunk
+ } finally {
+ onGetDeltaScanGeneratorCallback = null
+ }
+ }
+}
+
+/**
+ * A [[TahoeFileIndex]] that uses a prepared scan to return the list of
relevant files.
+ * This is injected into a query right before query planning by
[[PrepareDeltaScan]] so that
+ * CBO and metering can accurately understand how much data will be read.
+ *
+ * @param versionScanned The version of the table that is being scanned, if a
specific version
+ * has specifically been requested, e.g. by time travel.
+ */
+case class PreparedDeltaFileIndex(
+ override val spark: SparkSession,
+ override val deltaLog: DeltaLog,
+ override val path: Path,
+ preparedScan: DeltaScan,
+ versionScanned: Option[Long])
+ extends TahoeFileIndexWithSnapshot(spark, deltaLog, path,
preparedScan.scannedSnapshot)
+ with DeltaLogging {
+
+ /**
+ * Returns all matching/valid files by the given `partitionFilters` and
`dataFilters`
+ */
+ override def matchingFiles(
+ partitionFilters: Seq[Expression],
+ dataFilters: Seq[Expression]): Seq[AddFile] = {
+ val currentFilters = ExpressionSet(partitionFilters ++ dataFilters)
+ val (addFiles, eventData) = if (currentFilters == preparedScan.allFilters
||
+ currentFilters == preparedScan.filtersUsedForSkipping) {
+ // [[DeltaScan]] was created using `allFilters` out of which only
`filtersUsedForSkipping`
+ // filters were used for skipping while creating the DeltaScan.
+ // If currentFilters is same as allFilters, then no need to recalculate
files and we can use
+ // previous results.
+ // If currentFilters is same as filtersUsedForSkipping, then also we
don't need to recalculate
+ // files as [[DeltaScan.files]] were calculates using
filtersUsedForSkipping only. So if we
+ // recalculate, we will get same result. So we should use previous
result in this case also.
+ val eventData = Map(
+ "reused" -> true,
+ "currentFiltersSameAsPreparedAllFilters" -> (currentFilters ==
preparedScan.allFilters),
+ "currentFiltersSameAsPreparedFiltersUsedForSkipping" ->
+ (currentFilters == preparedScan.filtersUsedForSkipping)
+ )
+ (preparedScan.files.distinct, eventData)
+ } else {
+ logInfo(
+ s"""
+ |Prepared scan does not match actual filters. Reselecting files to
query.
+ |Prepared: ${preparedScan.allFilters}
+ |Actual: ${currentFilters}
+ """.stripMargin)
+ val eventData = Map(
+ "reused" -> false,
+ "preparedAllFilters" -> preparedScan.allFilters.mkString(","),
+ "preparedFiltersUsedForSkipping" ->
preparedScan.filtersUsedForSkipping.mkString(","),
+ "currentFilters" -> currentFilters.mkString(",")
+ )
+ val files = preparedScan.scannedSnapshot.filesForScan(partitionFilters
++ dataFilters).files
+ (files, eventData)
+ }
+ recordDeltaEvent(deltaLog,
+ opType = "delta.preparedDeltaFileIndex.reuseSkippingResult",
+ data = eventData)
+ addFiles
+ }
+
+ /**
+ * Returns the list of files that will be read when scanning this relation.
This call may be
+ * very expensive for large tables.
+ */
+ override def inputFiles: Array[String] =
+ preparedScan.files.map(f => absolutePath(f.path).toString).toArray
+
+ /** Refresh any cached file listings */
+ override def refresh(): Unit = { }
+
+ /** Sum of table file sizes, in bytes */
+ override def sizeInBytes: Long =
+ preparedScan.scanned.bytesCompressed
+ .getOrElse(spark.sessionState.conf.defaultSizeInBytes)
+
+ override def equals(other: Any): Boolean = other match {
+ case p: PreparedDeltaFileIndex =>
+ p.deltaLog == deltaLog && p.path == path && p.preparedScan ==
preparedScan &&
+ p.partitionSchema == partitionSchema && p.versionScanned ==
versionScanned
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ Objects.hash(deltaLog, path, preparedScan, partitionSchema, versionScanned)
+ }
+
+}
diff --git
a/backends-clickhouse/src/main/delta-32/io/delta/tables/ClickhouseTable.scala
b/backends-clickhouse/src/main/delta-32/io/delta/tables/ClickhouseTable.scala
index e747c87c6a..ba4c21df3a 100644
---
a/backends-clickhouse/src/main/delta-32/io/delta/tables/ClickhouseTable.scala
+++
b/backends-clickhouse/src/main/delta-32/io/delta/tables/ClickhouseTable.scala
@@ -27,12 +27,7 @@ import scala.collection.JavaConverters._
class ClickhouseTable(
@transient private val _df: Dataset[Row],
@transient private val table: ClickHouseTableV2)
- extends DeltaTable(_df, table) {
-
- override def optimize(): DeltaOptimizeBuilder = {
- DeltaOptimizeBuilder(table)
- }
-}
+ extends DeltaTable(_df, table) {}
object ClickhouseTable {
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 2f5824b580..9097a02b93 100644
---
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -196,8 +196,9 @@ class ClickhouseOptimisticTransaction(
isOptimize: Boolean,
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
- if (isOptimize)
+ if (isOptimize) {
throw new UnsupportedOperationException("Optimize is not supported for
ClickHouse")
+ }
hasWritten = true
@@ -258,7 +259,7 @@ class ClickhouseOptimisticTransaction(
DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns,
deltaLog)
} else {
checkInvariants
- }*/
+ } */
val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()
if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) {
@@ -304,10 +305,11 @@ class ClickhouseOptimisticTransaction(
committer.addedStatuses
})
.filter {
- // In some cases, we can write out an empty `inputData`. Some
examples of this (though, they
- // may be fixed in the future) are the MERGE command when you delete
with empty source, or
- // empty target, or on disjoint tables. This is hard to catch before
the write without
- // collecting the DF ahead of time. Instead, we can return only the
AddFiles that
+ // In some cases, we can write out an empty `inputData`. Some
examples of this (though,
+ // they may be fixed in the future) are the MERGE command when you
delete with empty
+ // source, or empty target, or on disjoint tables. This is hard to
catch before
+ // the write without collecting the DF ahead of time. Instead,
+ // we can return only the AddFiles that
// a) actually add rows, or
// b) don't have any stats so we don't know the number of rows at all
case a: AddFile => a.numLogicalRecords.forall(_ > 0)
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala
index bac5231309..f64de28f42 100644
---
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala
+++
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala
@@ -60,7 +60,7 @@ import org.apache.spark.util._
/**
* Gluten overwrite Delta:
*
- * This file is copied from Delta 3.2.0, it is modified to overcome the
following issues:
+ * This file is copied from Delta 3.2.1, it is modified to overcome the
following issues:
* 1. return ClickhouseOptimisticTransaction
* 2. return DeltaMergeTreeFileFormat
* 3. create HadoopFsRelation with the bucket options
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/Snapshot.scala
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/Snapshot.scala
index 8836f7c88d..5bfda914db 100644
---
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/Snapshot.scala
+++
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/Snapshot.scala
@@ -45,7 +45,7 @@ import org.apache.spark.util.Utils
/**
* Gluten overwrite Delta:
*
- * This file is copied from Delta 3.2.0. It is modified to overcome the
following issues:
+ * This file is copied from Delta 3.2.1. It is modified to overcome the
following issues:
* 1. filesForScan() will cache the DeltaScan by the FilterExprsAsKey
* 2. filesForScan() should return DeltaScan of AddMergeTreeParts instead of
AddFile
*/
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index 5f6a2dc3d7..d887e7a21b 100644
---
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.V1Table
import org.apache.spark.sql.connector.read.InputPartition
-import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog,
DeltaTableUtils, DeltaTimeTravelSpec, Snapshot, UnresolvedPathBasedDeltaTable}
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.sources.DeltaDataSource
@@ -89,13 +88,6 @@ class ClickHouseTableV2(
ret
}
- override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
- new WriteIntoDeltaBuilder(
- this,
- info.options,
- spark.sessionState.conf.useNullsForMissingDefaultColumnValues)
- }
-
def getFileFormat(protocol: Protocol, meta: Metadata):
DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
protocol,
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/DeleteCommand.scala
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/DeleteCommand.scala
index dec1f4b9c3..0a25346fc6 100644
---
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/DeleteCommand.scala
+++
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/DeleteCommand.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.types.LongType
/**
* Gluten overwrite Delta:
*
- * This file is copied from Delta 3.2.0. It is modified to overcome the
following issues:
+ * This file is copied from Delta 3.2.1. It is modified to overcome the
following issues:
* 1. In Clickhouse backend, we can't implement input_file_name() correctly,
we can only implement
* it so that it return a a list of filenames (concated by ',').
*/
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
index 5b21702202..439111df1b 100644
---
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
+++
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
@@ -49,7 +49,7 @@ import org.apache.spark.util.{SystemClock, ThreadUtils}
/**
* Gluten overwrite Delta:
*
- * This file is copied from Delta 3.2.0. It is modified in:
+ * This file is copied from Delta 3.2.1. It is modified in:
* 1. getDeltaTable supports to get ClickHouseTableV2
* 2. runOptimizeBinJobClickhouse
* 3. groupFilesIntoBinsClickhouse
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/UpdateCommand.scala
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/UpdateCommand.scala
index 9a7fb96775..4e75b84619 100644
---
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/UpdateCommand.scala
+++
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/UpdateCommand.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.types.LongType
/**
* Gluten overwrite Delta:
*
- * This file is copied from Delta 3.2.0. It is modified to overcome the
following issues:
+ * This file is copied from Delta 3.2.1. It is modified to overcome the
following issues:
* 1. In Clickhouse backend, we can't implement input_file_name() correctly,
we can only implement
* it so that it return a a list of filenames (concated by ',').
*/
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala
index 5d05bdb868..7a350ae4d5 100644
---
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala
+++
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala
@@ -44,7 +44,7 @@ import org.apache.spark.util.{Clock,
SerializableConfiguration, SystemClock}
/**
* Gluten overwrite Delta:
*
- * This file is copied from Delta 3.2.0. It is modified to overcome the
following issues:
+ * This file is copied from Delta 3.2.1. It is modified to overcome the
following issues:
* 1. In Gluten, part is a directory, but VacuumCommand assumes part is a
file. So we need some
* modifications to make it work.
* 2. Set the 'gluten.enabledForCurrentThread' to false, now gluten can not
support vacuum cmd.
@@ -255,7 +255,8 @@ object VacuumCommand extends VacuumCommandImpl with
Serializable {
val originalEnabledGluten =
spark.sparkContext.getLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY)
// gluten can not support vacuum command
-
spark.sparkContext.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY,
"false")
+ spark.sparkContext
+
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
// --- modified end
val validFiles =
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala
index 42a89d4271..aa1f94c5c9 100644
---
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala
+++
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.functions.{coalesce, col, count,
input_file_name, li
/**
* Gluten overwrite Delta:
*
- * This file is copied from Delta 3.2.0. It is modified to overcome the
following issues:
+ * This file is copied from Delta 3.2.1. It is modified to overcome the
following issues:
* 1. In Clickhouse backend, we can't implement input_file_name() correctly,
we can only implement
* it so that it return a a list of filenames (concated by ','). In
findTouchedFiles func.
*/
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
new file mode 100644
index 0000000000..dbb5c4050a
--- /dev/null
+++
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.rules
+
+import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
V2WriteCommand}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.delta.{OptimisticTransaction, Snapshot,
SubqueryTransformerHelper}
+import org.apache.spark.sql.delta.files.TahoeLogFileIndex
+import org.apache.spark.sql.delta.metering.DeltaLogging
+import org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.stats.DeltaScanGenerator
+
+import org.apache.hadoop.fs.Path
+
+class CHOptimizeMetadataOnlyDeltaQuery(protected val spark: SparkSession)
+ extends Rule[LogicalPlan]
+ with DeltaLogging
+ with SubqueryTransformerHelper
+ with OptimizeMetadataOnlyDeltaQuery {
+
+ private val scannedSnapshots =
+ new java.util.concurrent.ConcurrentHashMap[(String, Path), Snapshot]
+
+ protected def getDeltaScanGenerator(index: TahoeLogFileIndex):
DeltaScanGenerator = {
+ // The first case means that we've fixed the table snapshot for time travel
+ if (index.isTimeTravelQuery) return index.getSnapshot
+ OptimisticTransaction
+ .getActive()
+ .map(_.getDeltaScanGenerator(index))
+ .getOrElse {
+ // Will be called only when the log is accessed the first time
+ scannedSnapshots.computeIfAbsent(index.deltaLog.compositeId, _ =>
index.getSnapshot)
+ }
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ // Should not be applied to subqueries to avoid duplicate delta jobs.
+ val isSubquery = isSubqueryRoot(plan)
+ // Should not be applied to DataSourceV2 write plans, because they'll be
planned later
+ // through a V1 fallback and only that later planning takes place within
the transaction.
+ val isDataSourceV2 = plan.isInstanceOf[V2WriteCommand]
+ if (isSubquery || isDataSourceV2) {
+ return plan
+ }
+ // when 'stats.skipping' is off, it still use the metadata to optimize
query for count/min/max
+ if (
+ spark.sessionState.conf
+ .getConfString(
+ CHBackendSettings.GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE,
+
CHBackendSettings.GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE_DEFAULT_VALUE)
+ .toBoolean &&
+ !spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING, true)
+ ) {
+ optimizeQueryWithMetadata(plan)
+ } else {
+ plan
+ }
+ }
+}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 9a1b00f714..ba17d12ffa 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -133,6 +133,10 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
val GLUTEN_CLICKHOUSE_TABLE_PATH_TO_MTPS_CACHE_SIZE: String =
CHConf.prefixOf("table.path.to.mtps.cache.size")
+ val GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE: String =
+ CHConf.prefixOf("delta.metadata.optimize")
+ val GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE_DEFAULT_VALUE: String = "true"
+
def affinityMode: String = {
SparkEnv.get.conf
.get(
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index dea0d50c9d..470ece4037 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -30,6 +30,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule,
EqualToRewrite}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.DeltaLogFileIndex
+import org.apache.spark.sql.delta.rules.CHOptimizeMetadataOnlyDeltaQuery
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
CommandResultExec, FileSourceScanExec, GlutenFallbackReporter, RDDScanExec,
SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
@@ -59,6 +60,7 @@ private object CHRuleApi {
injector.injectOptimizerRule(spark =>
CHAggregateFunctionRewriteRule(spark))
injector.injectOptimizerRule(_ => CountDistinctWithoutExpand)
injector.injectOptimizerRule(_ => EqualToRewrite)
+ injector.injectPreCBORule(spark => new
CHOptimizeMetadataOnlyDeltaQuery(spark))
}
def injectLegacy(injector: LegacyInjector): Unit = {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
index c1210c5fba..033d514671 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -1977,24 +1977,29 @@ class GlutenClickHouseMergeTreeWriteSuite
| select * from lineitem
|""".stripMargin)
- val sqlStr =
- s"""
- |SELECT
- | count(*) AS count_order
- |FROM
- | lineitem_mergetree_count_opti
- |""".stripMargin
- runSql(sqlStr)(
- df => {
- val result = df.collect()
- assertResult(1)(result.length)
- assertResult("600572")(result(0).getLong(0).toString)
+ Seq("true", "false").foreach {
+ skip =>
+ withSQLConf("spark.databricks.delta.stats.skipping" -> skip.toString) {
+ val sqlStr =
+ s"""
+ |SELECT
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_count_opti
+ |""".stripMargin
+ runSql(sqlStr)(
+ df => {
+ val result = df.collect()
+ assertResult(1)(result.length)
+ assertResult("600572")(result(0).getLong(0).toString)
- // Spark 3.2 + Delta 2.0 does not support this feature
- if (!spark32) {
-
assert(df.queryExecution.executedPlan.isInstanceOf[LocalTableScanExec])
+ // Spark 3.2 + Delta 2.0 does not support this feature
+ if (!spark32) {
+
assert(df.queryExecution.executedPlan.isInstanceOf[LocalTableScanExec])
+ }
+ })
}
- })
+ }
}
test("test mergetree with column case sensitive") {
@@ -2128,4 +2133,86 @@ class GlutenClickHouseMergeTreeWriteSuite
}
})
}
+
+ test(
+ "GLUTEN-7812: Fix the query failed for the mergetree format " +
+ "when the 'spark.databricks.delta.stats.skipping' is off") {
+ // Spark 3.2 + Delta 2.0 doesn't not support this feature
+ if (!spark32) {
+ withSQLConf(("spark.databricks.delta.stats.skipping", "false")) {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_stats_skipping;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS
lineitem_mergetree_stats_skipping
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |PARTITIONED BY (l_returnflag)
+ |TBLPROPERTIES (orderByKey='l_orderkey',
+ | primaryKey='l_orderkey')
+ |LOCATION '$basePath/lineitem_mergetree_stats_skipping'
+ |""".stripMargin)
+
+ // dynamic partitions
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_stats_skipping
+ | select * from lineitem
+ |""".stripMargin)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | o_orderpriority,
+ | count(*) AS order_count
+ |FROM
+ | orders
+ |WHERE
+ | o_orderdate >= date'1993-07-01'
+ | AND o_orderdate < date'1993-07-01' + interval 3 month
+ | AND EXISTS (
+ | SELECT
+ | *
+ | FROM
+ | lineitem
+ | WHERE
+ | l_orderkey = o_orderkey
+ | AND l_commitdate < l_receiptdate)
+ |GROUP BY
+ | o_orderpriority
+ |ORDER BY
+ | o_orderpriority;
+ |
+ |""".stripMargin
+ runSql(sqlStr)(
+ df => {
+ val result = df.collect()
+ assertResult(5)(result.length)
+ assertResult("1-URGENT")(result(0).getString(0))
+ assertResult(999)(result(0).getLong(1))
+ assertResult("2-HIGH")(result(1).getString(0))
+ assertResult(997)(result(1).getLong(1))
+ assertResult("5-LOW")(result(4).getString(0))
+ assertResult(1077)(result(4).getLong(1))
+ })
+ }
+ }
+ }
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
index 87942c4155..fe6db65c3a 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
@@ -49,4 +49,8 @@ class SparkInjector private[injector] (
def injectFunction(functionDescription: FunctionDescription): Unit = {
extensions.injectFunction(control.disabler().wrapFunction(functionDescription))
}
+
+ def injectPreCBORule(builder: RuleBuilder): Unit = {
+ extensions.injectPreCBORule(control.disabler().wrapRule(builder))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]