This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ddf28c097b [GLUTEN-7812][CH] Fix the query failed for the mergetree 
format when the 'spark.databricks.delta.stats.skipping' is off (#7813)
ddf28c097b is described below

commit ddf28c097b0dfe2fbba7c2a4fdbfee5323bd833d
Author: Zhichao Zhang <[email protected]>
AuthorDate: Wed Nov 6 07:13:51 2024 +0800

    [GLUTEN-7812][CH] Fix the query failed for the mergetree format when the 
'spark.databricks.delta.stats.skipping' is off (#7813)
    
    For the Spark 3.3 + Delta 2.3, when the 
'spark.databricks.delta.stats.skipping' is off, there are some queries failed 
with the subquery, the error message is below:
    
    ```
    java.lang.IllegalArgumentException: requirement failed
            at scala.Predef$.require(Predef.scala:268)
            at 
org.apache.spark.sql.delta.SubqueryTransformerHelper.transformWithSubqueries(SubqueryTransformerHelper.scala:42)
            at 
org.apache.spark.sql.delta.SubqueryTransformerHelper.transformWithSubqueries$(SubqueryTransformerHelper.scala:40)
            at 
org.apache.spark.sql.delta.stats.PrepareDeltaScan.transformWithSubqueries(PrepareDeltaScan.scala:291)
            at 
org.apache.spark.sql.delta.PreprocessTableWithDVs.preprocessTablesWithDVs(PreprocessTableWithDVs.scala:67)
            at 
org.apache.spark.sql.delta.PreprocessTableWithDVs.preprocessTablesWithDVs$(PreprocessTableWithDVs.scala:66)
            at 
org.apache.spark.sql.delta.stats.PrepareDeltaScan.preprocessTablesWithDVs(PrepareDeltaScan.scala:291)
            at 
org.apache.spark.sql.delta.stats.PrepareDeltaScanBase.apply(PrepareDeltaScan.scala:227)
            at 
org.apache.spark.sql.delta.stats.PrepareDeltaScanBase.apply$(PrepareDeltaScan.scala:191)
            at 
org.apache.spark.sql.delta.stats.PrepareDeltaScan.apply(PrepareDeltaScan.scala:291)
            at 
org.apache.spark.sql.delta.stats.PrepareDeltaScan.apply(PrepareDeltaScan.scala:291)
    ```
    
    Close #7812.
---
 backends-clickhouse/pom.xml                        |   1 +
 .../rules/CHOptimizeMetadataOnlyDeltaQuery.scala   |  30 ++
 .../rules/CHOptimizeMetadataOnlyDeltaQuery.scala   |  77 ++++
 .../spark/sql/delta/stats/PrepareDeltaScan.scala   | 406 +++++++++++++++++++++
 .../delta-32/io/delta/tables/ClickhouseTable.scala |   7 +-
 .../delta/ClickhouseOptimisticTransaction.scala    |  14 +-
 .../org/apache/spark/sql/delta/DeltaLog.scala      |   2 +-
 .../org/apache/spark/sql/delta/Snapshot.scala      |   2 +-
 .../sql/delta/catalog/ClickHouseTableV2.scala      |   8 -
 .../spark/sql/delta/commands/DeleteCommand.scala   |   2 +-
 .../sql/delta/commands/OptimizeTableCommand.scala  |   2 +-
 .../spark/sql/delta/commands/UpdateCommand.scala   |   2 +-
 .../spark/sql/delta/commands/VacuumCommand.scala   |   5 +-
 .../commands/merge/ClassicMergeExecutor.scala      |   2 +-
 .../rules/CHOptimizeMetadataOnlyDeltaQuery.scala   |  77 ++++
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |   4 +
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |   2 +
 .../GlutenClickHouseMergeTreeWriteSuite.scala      | 119 +++++-
 .../gluten/extension/injector/SparkInjector.scala  |   4 +
 19 files changed, 722 insertions(+), 44 deletions(-)

diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index b00470f9ea..0f593a861b 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -368,6 +368,7 @@
             <excludes>
               
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/*.scala</exclude>
               
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/merge/*.scala</exclude>
+              
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/stats/*.scala</exclude>
               
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/DeltaLog.scala</exclude>
               
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/Snapshot.scala</exclude>
             </excludes>
diff --git 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
new file mode 100644
index 0000000000..a360fa8d72
--- /dev/null
+++ 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.rules
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.delta.metering.DeltaLogging
+
+class CHOptimizeMetadataOnlyDeltaQuery(protected val spark: SparkSession)
+  extends Rule[LogicalPlan]
+  with DeltaLogging {
+
+  // For Delta 2.0, it can not support to optimize query with the metadata
+  override def apply(plan: LogicalPlan): LogicalPlan = plan
+}
diff --git 
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
 
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
new file mode 100644
index 0000000000..dbb5c4050a
--- /dev/null
+++ 
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.rules
+
+import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
V2WriteCommand}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.delta.{OptimisticTransaction, Snapshot, 
SubqueryTransformerHelper}
+import org.apache.spark.sql.delta.files.TahoeLogFileIndex
+import org.apache.spark.sql.delta.metering.DeltaLogging
+import org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.stats.DeltaScanGenerator
+
+import org.apache.hadoop.fs.Path
+
+class CHOptimizeMetadataOnlyDeltaQuery(protected val spark: SparkSession)
+  extends Rule[LogicalPlan]
+  with DeltaLogging
+  with SubqueryTransformerHelper
+  with OptimizeMetadataOnlyDeltaQuery {
+
+  private val scannedSnapshots =
+    new java.util.concurrent.ConcurrentHashMap[(String, Path), Snapshot]
+
+  protected def getDeltaScanGenerator(index: TahoeLogFileIndex): 
DeltaScanGenerator = {
+    // The first case means that we've fixed the table snapshot for time travel
+    if (index.isTimeTravelQuery) return index.getSnapshot
+    OptimisticTransaction
+      .getActive()
+      .map(_.getDeltaScanGenerator(index))
+      .getOrElse {
+        // Will be called only when the log is accessed the first time
+        scannedSnapshots.computeIfAbsent(index.deltaLog.compositeId, _ => 
index.getSnapshot)
+      }
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    // Should not be applied to subqueries to avoid duplicate delta jobs.
+    val isSubquery = isSubqueryRoot(plan)
+    // Should not be applied to DataSourceV2 write plans, because they'll be 
planned later
+    // through a V1 fallback and only that later planning takes place within 
the transaction.
+    val isDataSourceV2 = plan.isInstanceOf[V2WriteCommand]
+    if (isSubquery || isDataSourceV2) {
+      return plan
+    }
+    // when 'stats.skipping' is off, it still use the metadata to optimize 
query for count/min/max
+    if (
+      spark.sessionState.conf
+        .getConfString(
+          CHBackendSettings.GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE,
+          
CHBackendSettings.GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE_DEFAULT_VALUE)
+        .toBoolean &&
+      !spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING, true)
+    ) {
+      optimizeQueryWithMetadata(plan)
+    } else {
+      plan
+    }
+  }
+}
diff --git 
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala
 
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala
new file mode 100644
index 0000000000..21e31d3541
--- /dev/null
+++ 
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.delta.stats
+
+import java.util.Objects
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.delta._
+import org.apache.spark.sql.delta.actions.AddFile
+import org.apache.spark.sql.delta.files.{TahoeFileIndexWithSnapshot, 
TahoeLogFileIndex}
+import org.apache.spark.sql.delta.metering.DeltaLogging
+import org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.PROJECT
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+/**
+ * Gluten overwrite Delta:
+ *
+ * This file is copied from Delta 2.3.0, it is modified to overcome the 
following issues:
+ *   1. Returns the plan directly even if stats.skipping is turned off
+ */
+
+/**
+ * Before query planning, we prepare any scans over delta tables by pushing
+ * any projections or filters in allowing us to gather more accurate statistics
+ * for CBO and metering.
+ *
+ * Note the following
+ * - This rule also ensures that all reads from the same delta log use the 
same snapshot of log
+ *   thus providing snapshot isolation.
+ * - If this rule is invoked within an active [[OptimisticTransaction]], then 
the scans are
+ *   generated using the transaction.
+ */
+trait PrepareDeltaScanBase extends Rule[LogicalPlan]
+  with PredicateHelper
+  with DeltaLogging
+  with OptimizeMetadataOnlyDeltaQuery
+  with PreprocessTableWithDVs { self: PrepareDeltaScan =>
+
+  /**
+   * Tracks the first-access snapshots of other logs planned by this rule. The 
snapshots are
+   * the keyed by the log's unique id. Note that the lifetime of this rule is 
a single
+   * query, therefore, the map tracks the snapshots only within a query.
+   */
+  private val scannedSnapshots =
+    new java.util.concurrent.ConcurrentHashMap[(String, Path), Snapshot]
+
+  /**
+   * Gets the [[DeltaScanGenerator]] for the given log, which will be used to 
generate
+   * [[DeltaScan]]s. Every time this method is called on a log within the 
lifetime of this
+   * rule (i.e., the lifetime of the query for which this rule was 
instantiated), the returned
+   * generator will read a snapshot that is pinned on the first access for 
that log.
+   *
+   * Internally, it will use the snapshot of the file index, the snapshot of 
the active transaction
+   * (if any), or the latest snapshot of the given log.
+   */
+  protected def getDeltaScanGenerator(index: TahoeLogFileIndex): 
DeltaScanGenerator = {
+    // The first case means that we've fixed the table snapshot for time travel
+    if (index.isTimeTravelQuery) return index.getSnapshot
+    val scanGenerator = OptimisticTransaction.getActive()
+      .map(_.getDeltaScanGenerator(index))
+      .getOrElse {
+        // Will be called only when the log is accessed the first time
+        scannedSnapshots.computeIfAbsent(index.deltaLog.compositeId, _ => 
index.getSnapshot)
+      }
+    import PrepareDeltaScanBase._
+    if (onGetDeltaScanGeneratorCallback != null) 
onGetDeltaScanGeneratorCallback(scanGenerator)
+    scanGenerator
+  }
+
+  /**
+   * Helper method to generate a [[PreparedDeltaFileIndex]]
+   */
+  protected def getPreparedIndex(
+      preparedScan: DeltaScan,
+      fileIndex: TahoeLogFileIndex): PreparedDeltaFileIndex = {
+    assert(fileIndex.partitionFilters.isEmpty,
+      "Partition filters should have been extracted by DeltaAnalysis.")
+    PreparedDeltaFileIndex(
+      spark,
+      fileIndex.deltaLog,
+      fileIndex.path,
+      preparedScan,
+      fileIndex.versionToUse)
+  }
+
+  /**
+   * Scan files using the given `filters` and return `DeltaScan`.
+   *
+   * Note: when `limitOpt` is non empty, `filters` must contain only partition 
filters. Otherwise,
+   * it can contain arbitrary filters. See `DeltaTableScan` for more details.
+   */
+  protected def filesForScan(
+      scanGenerator: DeltaScanGenerator,
+      limitOpt: Option[Int],
+      filters: Seq[Expression],
+      delta: LogicalRelation): DeltaScan = {
+    withStatusCode("DELTA", "Filtering files for query") {
+      if (limitOpt.nonEmpty) {
+        // If we trigger limit push down, the filters must be partition 
filters. Since
+        // there are no data filters, we don't need to apply Generated Columns
+        // optimization. See `DeltaTableScan` for more details.
+        return scanGenerator.filesForScan(limitOpt.get, filters)
+      }
+      val filtersForScan =
+        if (!GeneratedColumn.partitionFilterOptimizationEnabled(spark)) {
+          filters
+        } else {
+          val generatedPartitionFilters = 
GeneratedColumn.generatePartitionFilters(
+            spark, scanGenerator.snapshotToScan, filters, delta)
+          filters ++ generatedPartitionFilters
+        }
+      scanGenerator.filesForScan(filtersForScan)
+    }
+  }
+
+  /**
+   * Prepares delta scans sequentially.
+   */
+  protected def prepareDeltaScan(plan: LogicalPlan): LogicalPlan = {
+    // A map from the canonicalized form of a DeltaTableScan operator to its 
corresponding delta
+    // scan. This map is used to avoid fetching duplicate delta indexes for 
structurally-equal
+    // delta scans.
+    val deltaScans = new mutable.HashMap[LogicalPlan, DeltaScan]()
+
+    transformWithSubqueries(plan) {
+        case scan @ DeltaTableScan(planWithRemovedProjections, filters, 
fileIndex,
+          limit, delta) =>
+          val scanGenerator = getDeltaScanGenerator(fileIndex)
+          val preparedScan = 
deltaScans.getOrElseUpdate(planWithRemovedProjections.canonicalized,
+              filesForScan(scanGenerator, limit, filters, delta))
+          val preparedIndex = getPreparedIndex(preparedScan, fileIndex)
+          optimizeGeneratedColumns(scan, preparedIndex, filters, limit, delta)
+      }
+  }
+
+  protected def optimizeGeneratedColumns(
+      scan: LogicalPlan,
+      preparedIndex: PreparedDeltaFileIndex,
+      filters: Seq[Expression],
+      limit: Option[Int],
+      delta: LogicalRelation): LogicalPlan = {
+    if (limit.nonEmpty) {
+      // If we trigger limit push down, the filters must be partition filters. 
Since
+      // there are no data filters, we don't need to apply Generated Columns
+      // optimization. See `DeltaTableScan` for more details.
+      return DeltaTableUtils.replaceFileIndex(scan, preparedIndex)
+    }
+    if (!GeneratedColumn.partitionFilterOptimizationEnabled(spark)) {
+      DeltaTableUtils.replaceFileIndex(scan, preparedIndex)
+    } else {
+      val generatedPartitionFilters =
+        GeneratedColumn.generatePartitionFilters(spark, preparedIndex, 
filters, delta)
+      val scanWithFilters =
+        if (generatedPartitionFilters.nonEmpty) {
+          scan transformUp {
+            case delta @ DeltaTable(_: TahoeLogFileIndex) =>
+              Filter(generatedPartitionFilters.reduceLeft(And), delta)
+          }
+        } else {
+          scan
+        }
+      DeltaTableUtils.replaceFileIndex(scanWithFilters, preparedIndex)
+    }
+  }
+
+  override def apply(_plan: LogicalPlan): LogicalPlan = {
+    var plan = _plan
+
+    // --- modified start
+    // Should not be applied to subqueries to avoid duplicate delta jobs.
+    val isSubquery = isSubqueryRoot(plan)
+    // Should not be applied to DataSourceV2 write plans, because they'll be 
planned later
+    // through a V1 fallback and only that later planning takes place within 
the transaction.
+    val isDataSourceV2 = plan.isInstanceOf[V2WriteCommand]
+    if (isSubquery || isDataSourceV2) {
+      return plan
+    }
+
+    val shouldPrepareDeltaScan = (
+      spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING)
+    )
+    val updatedPlan = if (shouldPrepareDeltaScan) {
+      if 
(spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED))
 {
+        plan = optimizeQueryWithMetadata(plan)
+      }
+      prepareDeltaScan(plan)
+    } else {
+      // If this query is running inside an active transaction and is touching 
the same table
+      // as the transaction, then mark that the entire table as tainted to be 
safe.
+      OptimisticTransaction.getActive.foreach { txn =>
+        val logsInPlan = plan.collect { case DeltaTable(fileIndex) => 
fileIndex.deltaLog }
+        if (logsInPlan.exists(_.isSameLogAs(txn.deltaLog))) {
+          txn.readWholeTable()
+        }
+      }
+
+      // Just return the plan if statistics based skipping is off.
+      // It will fall back to just partition pruning at planning time.
+      plan
+    }
+    // --- modified end
+    preprocessTablesWithDVs(updatedPlan)
+  }
+
+  /**
+   * This is an extractor object. See 
https://docs.scala-lang.org/tour/extractor-objects.html.
+   */
+  object DeltaTableScan {
+
+    /**
+     * The components of DeltaTableScanType are:
+     * - the plan with removed projections. We remove projections as a plan 
differentiator
+     * because it does not affect file listing results.
+     * - filter expressions collected by `PhysicalOperation`
+     * - the `TahoeLogFileIndex` of the matched DeltaTable`
+     * - integer value of limit expression, if any
+     * - matched `DeltaTable`
+     */
+    private type DeltaTableScanType =
+      (LogicalPlan, Seq[Expression], TahoeLogFileIndex, Option[Int], 
LogicalRelation)
+
+    /**
+     * This is an extractor method (basically, the opposite of a constructor) 
which takes in an
+     * object `plan` and tries to give back the arguments as a 
[[DeltaTableScanType]].
+     */
+    def unapply(plan: LogicalPlan): Option[DeltaTableScanType] = {
+      val limitPushdownEnabled = 
spark.conf.get(DeltaSQLConf.DELTA_LIMIT_PUSHDOWN_ENABLED)
+
+      // Remove projections as a plan differentiator because it does not 
affect file listing
+      // results. Plans with the same filters but different projections 
therefore will not have
+      // duplicate delta indexes.
+      def canonicalizePlanForDeltaFileListing(plan: LogicalPlan): LogicalPlan 
= {
+        val planWithRemovedProjections = 
plan.transformWithPruning(_.containsPattern(PROJECT)) {
+          case p: Project if 
p.projectList.forall(_.isInstanceOf[AttributeReference]) => p.child
+        }
+        planWithRemovedProjections
+      }
+
+      plan match {
+        case LocalLimit(IntegerLiteral(limit),
+          PhysicalOperation(_, filters, delta @ DeltaTable(fileIndex: 
TahoeLogFileIndex)))
+            if limitPushdownEnabled && containsPartitionFiltersOnly(filters, 
fileIndex) =>
+          Some((canonicalizePlanForDeltaFileListing(plan), filters, fileIndex, 
Some(limit), delta))
+        case PhysicalOperation(
+            _,
+            filters,
+            delta @ DeltaTable(fileIndex: TahoeLogFileIndex)) =>
+          val allFilters = fileIndex.partitionFilters ++ filters
+          Some((canonicalizePlanForDeltaFileListing(plan), allFilters, 
fileIndex, None, delta))
+
+        case _ => None
+      }
+    }
+
+    private def containsPartitionFiltersOnly(
+        filters: Seq[Expression],
+        fileIndex: TahoeLogFileIndex): Boolean = {
+      val partitionColumns = 
fileIndex.snapshotAtAnalysis.metadata.partitionColumns
+      import DeltaTableUtils._
+      filters.forall(expr => !containsSubquery(expr) &&
+        isPredicatePartitionColumnsOnly(expr, partitionColumns, spark))
+    }
+  }
+}
+
+class PrepareDeltaScan(protected val spark: SparkSession)
+  extends PrepareDeltaScanBase
+
+object PrepareDeltaScanBase {
+
+  /**
+   * Optional callback function that is called after `getDeltaScanGenerator` 
is called
+   * by the PrepareDeltaScan rule. This is primarily used for testing purposes.
+   */
+  @volatile private var onGetDeltaScanGeneratorCallback: DeltaScanGenerator => 
Unit = _
+
+  /**
+   * Run a thunk of code with the given callback function injected into the 
PrepareDeltaScan rule.
+   * The callback function is called after `getDeltaScanGenerator` is called
+   * by the PrepareDeltaScan rule. This is primarily used for testing purposes.
+   */
+  private[delta] def withCallbackOnGetDeltaScanGenerator[T](
+      callback: DeltaScanGenerator => Unit)(thunk: => T): T = {
+    try {
+      onGetDeltaScanGeneratorCallback = callback
+      thunk
+    } finally {
+      onGetDeltaScanGeneratorCallback = null
+    }
+  }
+}
+
+/**
+ * A [[TahoeFileIndex]] that uses a prepared scan to return the list of 
relevant files.
+ * This is injected into a query right before query planning by 
[[PrepareDeltaScan]] so that
+ * CBO and metering can accurately understand how much data will be read.
+ *
+ * @param versionScanned The version of the table that is being scanned, if a 
specific version
+ *                       has specifically been requested, e.g. by time travel.
+ */
+case class PreparedDeltaFileIndex(
+    override val spark: SparkSession,
+    override val deltaLog: DeltaLog,
+    override val path: Path,
+    preparedScan: DeltaScan,
+    versionScanned: Option[Long])
+  extends TahoeFileIndexWithSnapshot(spark, deltaLog, path, 
preparedScan.scannedSnapshot)
+  with DeltaLogging {
+
+  /**
+   * Returns all matching/valid files by the given `partitionFilters` and 
`dataFilters`
+   */
+  override def matchingFiles(
+      partitionFilters: Seq[Expression],
+      dataFilters: Seq[Expression]): Seq[AddFile] = {
+    val currentFilters = ExpressionSet(partitionFilters ++ dataFilters)
+    val (addFiles, eventData) = if (currentFilters == preparedScan.allFilters 
||
+        currentFilters == preparedScan.filtersUsedForSkipping) {
+      // [[DeltaScan]] was created using `allFilters` out of which only 
`filtersUsedForSkipping`
+      // filters were used for skipping while creating the DeltaScan.
+      // If currentFilters is same as allFilters, then no need to recalculate 
files and we can use
+      // previous results.
+      // If currentFilters is same as filtersUsedForSkipping, then also we 
don't need to recalculate
+      // files as [[DeltaScan.files]] were calculates using 
filtersUsedForSkipping only. So if we
+      // recalculate, we will get same result. So we should use previous 
result in this case also.
+      val eventData = Map(
+        "reused" -> true,
+        "currentFiltersSameAsPreparedAllFilters" -> (currentFilters == 
preparedScan.allFilters),
+        "currentFiltersSameAsPreparedFiltersUsedForSkipping" ->
+          (currentFilters == preparedScan.filtersUsedForSkipping)
+      )
+      (preparedScan.files.distinct, eventData)
+    } else {
+      logInfo(
+        s"""
+           |Prepared scan does not match actual filters. Reselecting files to 
query.
+           |Prepared: ${preparedScan.allFilters}
+           |Actual: ${currentFilters}
+         """.stripMargin)
+      val eventData = Map(
+        "reused" -> false,
+        "preparedAllFilters" -> preparedScan.allFilters.mkString(","),
+        "preparedFiltersUsedForSkipping" -> 
preparedScan.filtersUsedForSkipping.mkString(","),
+        "currentFilters" -> currentFilters.mkString(",")
+      )
+      val files = preparedScan.scannedSnapshot.filesForScan(partitionFilters 
++ dataFilters).files
+      (files, eventData)
+    }
+    recordDeltaEvent(deltaLog,
+      opType = "delta.preparedDeltaFileIndex.reuseSkippingResult",
+      data = eventData)
+    addFiles
+  }
+
+  /**
+   * Returns the list of files that will be read when scanning this relation. 
This call may be
+   * very expensive for large tables.
+   */
+  override def inputFiles: Array[String] =
+    preparedScan.files.map(f => absolutePath(f.path).toString).toArray
+
+  /** Refresh any cached file listings */
+  override def refresh(): Unit = { }
+
+  /** Sum of table file sizes, in bytes */
+  override def sizeInBytes: Long =
+    preparedScan.scanned.bytesCompressed
+      .getOrElse(spark.sessionState.conf.defaultSizeInBytes)
+
+  override def equals(other: Any): Boolean = other match {
+    case p: PreparedDeltaFileIndex =>
+      p.deltaLog == deltaLog && p.path == path && p.preparedScan == 
preparedScan &&
+        p.partitionSchema == partitionSchema && p.versionScanned == 
versionScanned
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    Objects.hash(deltaLog, path, preparedScan, partitionSchema, versionScanned)
+  }
+
+}
diff --git 
a/backends-clickhouse/src/main/delta-32/io/delta/tables/ClickhouseTable.scala 
b/backends-clickhouse/src/main/delta-32/io/delta/tables/ClickhouseTable.scala
index e747c87c6a..ba4c21df3a 100644
--- 
a/backends-clickhouse/src/main/delta-32/io/delta/tables/ClickhouseTable.scala
+++ 
b/backends-clickhouse/src/main/delta-32/io/delta/tables/ClickhouseTable.scala
@@ -27,12 +27,7 @@ import scala.collection.JavaConverters._
 class ClickhouseTable(
     @transient private val _df: Dataset[Row],
     @transient private val table: ClickHouseTableV2)
-  extends DeltaTable(_df, table) {
-
-  override def optimize(): DeltaOptimizeBuilder = {
-    DeltaOptimizeBuilder(table)
-  }
-}
+  extends DeltaTable(_df, table) {}
 
 object ClickhouseTable {
 
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 2f5824b580..9097a02b93 100644
--- 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -196,8 +196,9 @@ class ClickhouseOptimisticTransaction(
       isOptimize: Boolean,
       additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
 
-    if (isOptimize)
+    if (isOptimize) {
       throw new UnsupportedOperationException("Optimize is not supported for 
ClickHouse")
+    }
 
     hasWritten = true
 
@@ -258,7 +259,7 @@ class ClickhouseOptimisticTransaction(
           DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, 
deltaLog)
         } else {
           checkInvariants
-        }*/
+        } */
       val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()
 
       if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) {
@@ -304,10 +305,11 @@ class ClickhouseOptimisticTransaction(
          committer.addedStatuses
        })
         .filter {
-          // In some cases, we can write out an empty `inputData`. Some 
examples of this (though, they
-          // may be fixed in the future) are the MERGE command when you delete 
with empty source, or
-          // empty target, or on disjoint tables. This is hard to catch before 
the write without
-          // collecting the DF ahead of time. Instead, we can return only the 
AddFiles that
+          // In some cases, we can write out an empty `inputData`. Some 
examples of this (though,
+          // they may be fixed in the future) are the MERGE command when you 
delete with empty
+          // source, or empty target, or on disjoint tables. This is hard to 
catch before
+          // the write without collecting the DF ahead of time. Instead,
+          // we can return only the AddFiles that
           // a) actually add rows, or
           // b) don't have any stats so we don't know the number of rows at all
           case a: AddFile => a.numLogicalRecords.forall(_ > 0)
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala
index bac5231309..f64de28f42 100644
--- 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/DeltaLog.scala
@@ -60,7 +60,7 @@ import org.apache.spark.util._
 /**
  * Gluten overwrite Delta:
  *
- * This file is copied from Delta 3.2.0, it is modified to overcome the 
following issues:
+ * This file is copied from Delta 3.2.1, it is modified to overcome the 
following issues:
  *   1. return ClickhouseOptimisticTransaction
  *   2. return DeltaMergeTreeFileFormat
  *   3. create HadoopFsRelation with the bucket options
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/Snapshot.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/Snapshot.scala
index 8836f7c88d..5bfda914db 100644
--- 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/Snapshot.scala
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/Snapshot.scala
@@ -45,7 +45,7 @@ import org.apache.spark.util.Utils
 /**
  * Gluten overwrite Delta:
  *
- * This file is copied from Delta 3.2.0. It is modified to overcome the 
following issues:
+ * This file is copied from Delta 3.2.1. It is modified to overcome the 
following issues:
  *   1. filesForScan() will cache the DeltaScan by the FilterExprsAsKey
  *   2. filesForScan() should return DeltaScan of AddMergeTreeParts instead of 
AddFile
  */
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index 5f6a2dc3d7..d887e7a21b 100644
--- 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connector.catalog.V1Table
 import org.apache.spark.sql.connector.read.InputPartition
-import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
 import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, 
DeltaTableUtils, DeltaTimeTravelSpec, Snapshot, UnresolvedPathBasedDeltaTable}
 import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
 import org.apache.spark.sql.delta.sources.DeltaDataSource
@@ -89,13 +88,6 @@ class ClickHouseTableV2(
     ret
   }
 
-  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
-    new WriteIntoDeltaBuilder(
-      this,
-      info.options,
-      spark.sessionState.conf.useNullsForMissingDefaultColumnValues)
-  }
-
   def getFileFormat(protocol: Protocol, meta: Metadata): 
DeltaMergeTreeFileFormat = {
     new DeltaMergeTreeFileFormat(
       protocol,
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/DeleteCommand.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/DeleteCommand.scala
index dec1f4b9c3..0a25346fc6 100644
--- 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/DeleteCommand.scala
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/DeleteCommand.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.types.LongType
 /**
  * Gluten overwrite Delta:
  *
- * This file is copied from Delta 3.2.0. It is modified to overcome the 
following issues:
+ * This file is copied from Delta 3.2.1. It is modified to overcome the 
following issues:
  *   1. In Clickhouse backend, we can't implement input_file_name() correctly, 
we can only implement
  *      it so that it return a a list of filenames (concated by ',').
  */
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
index 5b21702202..439111df1b 100644
--- 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
@@ -49,7 +49,7 @@ import org.apache.spark.util.{SystemClock, ThreadUtils}
 /**
  * Gluten overwrite Delta:
  *
- * This file is copied from Delta 3.2.0. It is modified in:
+ * This file is copied from Delta 3.2.1. It is modified in:
  *   1. getDeltaTable supports to get ClickHouseTableV2
  *   2. runOptimizeBinJobClickhouse
  *   3. groupFilesIntoBinsClickhouse
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/UpdateCommand.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/UpdateCommand.scala
index 9a7fb96775..4e75b84619 100644
--- 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/UpdateCommand.scala
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/UpdateCommand.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.types.LongType
 /**
  * Gluten overwrite Delta:
  *
- * This file is copied from Delta 3.2.0. It is modified to overcome the 
following issues:
+ * This file is copied from Delta 3.2.1. It is modified to overcome the 
following issues:
  *   1. In Clickhouse backend, we can't implement input_file_name() correctly, 
we can only implement
  *      it so that it return a a list of filenames (concated by ',').
  */
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala
index 5d05bdb868..7a350ae4d5 100644
--- 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/VacuumCommand.scala
@@ -44,7 +44,7 @@ import org.apache.spark.util.{Clock, 
SerializableConfiguration, SystemClock}
 /**
  * Gluten overwrite Delta:
  *
- * This file is copied from Delta 3.2.0. It is modified to overcome the 
following issues:
+ * This file is copied from Delta 3.2.1. It is modified to overcome the 
following issues:
  *   1. In Gluten, part is a directory, but VacuumCommand assumes part is a 
file. So we need some
  *      modifications to make it work.
  *   2. Set the 'gluten.enabledForCurrentThread' to false, now gluten can not 
support vacuum cmd.
@@ -255,7 +255,8 @@ object VacuumCommand extends VacuumCommandImpl with 
Serializable {
       val originalEnabledGluten =
         
spark.sparkContext.getLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY)
       // gluten can not support vacuum command
-      
spark.sparkContext.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY,
 "false")
+      spark.sparkContext
+        
.setLocalProperty(GlutenSessionExtensions.GLUTEN_ENABLE_FOR_THREAD_KEY, "false")
       // --- modified end
 
       val validFiles =
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala
index 42a89d4271..aa1f94c5c9 100644
--- 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.functions.{coalesce, col, count, 
input_file_name, li
 /**
  * Gluten overwrite Delta:
  *
- * This file is copied from Delta 3.2.0. It is modified to overcome the 
following issues:
+ * This file is copied from Delta 3.2.1. It is modified to overcome the 
following issues:
  *   1. In Clickhouse backend, we can't implement input_file_name() correctly, 
we can only implement
  *      it so that it return a a list of filenames (concated by ','). In 
findTouchedFiles func.
  */
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
new file mode 100644
index 0000000000..dbb5c4050a
--- /dev/null
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/rules/CHOptimizeMetadataOnlyDeltaQuery.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.rules
+
+import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
V2WriteCommand}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.delta.{OptimisticTransaction, Snapshot, 
SubqueryTransformerHelper}
+import org.apache.spark.sql.delta.files.TahoeLogFileIndex
+import org.apache.spark.sql.delta.metering.DeltaLogging
+import org.apache.spark.sql.delta.perf.OptimizeMetadataOnlyDeltaQuery
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.stats.DeltaScanGenerator
+
+import org.apache.hadoop.fs.Path
+
+class CHOptimizeMetadataOnlyDeltaQuery(protected val spark: SparkSession)
+  extends Rule[LogicalPlan]
+  with DeltaLogging
+  with SubqueryTransformerHelper
+  with OptimizeMetadataOnlyDeltaQuery {
+
+  private val scannedSnapshots =
+    new java.util.concurrent.ConcurrentHashMap[(String, Path), Snapshot]
+
+  protected def getDeltaScanGenerator(index: TahoeLogFileIndex): 
DeltaScanGenerator = {
+    // The first case means that we've fixed the table snapshot for time travel
+    if (index.isTimeTravelQuery) return index.getSnapshot
+    OptimisticTransaction
+      .getActive()
+      .map(_.getDeltaScanGenerator(index))
+      .getOrElse {
+        // Will be called only when the log is accessed the first time
+        scannedSnapshots.computeIfAbsent(index.deltaLog.compositeId, _ => 
index.getSnapshot)
+      }
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    // Should not be applied to subqueries to avoid duplicate delta jobs.
+    val isSubquery = isSubqueryRoot(plan)
+    // Should not be applied to DataSourceV2 write plans, because they'll be 
planned later
+    // through a V1 fallback and only that later planning takes place within 
the transaction.
+    val isDataSourceV2 = plan.isInstanceOf[V2WriteCommand]
+    if (isSubquery || isDataSourceV2) {
+      return plan
+    }
+    // when 'stats.skipping' is off, it still use the metadata to optimize 
query for count/min/max
+    if (
+      spark.sessionState.conf
+        .getConfString(
+          CHBackendSettings.GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE,
+          
CHBackendSettings.GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE_DEFAULT_VALUE)
+        .toBoolean &&
+      !spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING, true)
+    ) {
+      optimizeQueryWithMetadata(plan)
+    } else {
+      plan
+    }
+  }
+}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 9a1b00f714..ba17d12ffa 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -133,6 +133,10 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
   val GLUTEN_CLICKHOUSE_TABLE_PATH_TO_MTPS_CACHE_SIZE: String =
     CHConf.prefixOf("table.path.to.mtps.cache.size")
 
+  val GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE: String =
+    CHConf.prefixOf("delta.metadata.optimize")
+  val GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE_DEFAULT_VALUE: String = "true"
+
   def affinityMode: String = {
     SparkEnv.get.conf
       .get(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index dea0d50c9d..470ece4037 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -30,6 +30,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, 
EqualToRewrite}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.delta.DeltaLogFileIndex
+import org.apache.spark.sql.delta.rules.CHOptimizeMetadataOnlyDeltaQuery
 import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, 
CommandResultExec, FileSourceScanExec, GlutenFallbackReporter, RDDScanExec, 
SparkPlan}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
@@ -59,6 +60,7 @@ private object CHRuleApi {
     injector.injectOptimizerRule(spark => 
CHAggregateFunctionRewriteRule(spark))
     injector.injectOptimizerRule(_ => CountDistinctWithoutExpand)
     injector.injectOptimizerRule(_ => EqualToRewrite)
+    injector.injectPreCBORule(spark => new 
CHOptimizeMetadataOnlyDeltaQuery(spark))
   }
 
   def injectLegacy(injector: LegacyInjector): Unit = {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
index c1210c5fba..033d514671 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -1977,24 +1977,29 @@ class GlutenClickHouseMergeTreeWriteSuite
                  | select * from lineitem
                  |""".stripMargin)
 
-    val sqlStr =
-      s"""
-         |SELECT
-         |    count(*) AS count_order
-         |FROM
-         |    lineitem_mergetree_count_opti
-         |""".stripMargin
-    runSql(sqlStr)(
-      df => {
-        val result = df.collect()
-        assertResult(1)(result.length)
-        assertResult("600572")(result(0).getLong(0).toString)
+    Seq("true", "false").foreach {
+      skip =>
+        withSQLConf("spark.databricks.delta.stats.skipping" -> skip.toString) {
+          val sqlStr =
+            s"""
+               |SELECT
+               |    count(*) AS count_order
+               |FROM
+               |    lineitem_mergetree_count_opti
+               |""".stripMargin
+          runSql(sqlStr)(
+            df => {
+              val result = df.collect()
+              assertResult(1)(result.length)
+              assertResult("600572")(result(0).getLong(0).toString)
 
-        // Spark 3.2 + Delta 2.0 does not support this feature
-        if (!spark32) {
-          
assert(df.queryExecution.executedPlan.isInstanceOf[LocalTableScanExec])
+              // Spark 3.2 + Delta 2.0 does not support this feature
+              if (!spark32) {
+                
assert(df.queryExecution.executedPlan.isInstanceOf[LocalTableScanExec])
+              }
+            })
         }
-      })
+    }
   }
 
   test("test mergetree with column case sensitive") {
@@ -2128,4 +2133,86 @@ class GlutenClickHouseMergeTreeWriteSuite
         }
       })
   }
+
+  test(
+    "GLUTEN-7812: Fix the query failed for the mergetree format " +
+      "when the 'spark.databricks.delta.stats.skipping' is off") {
+    // Spark 3.2 + Delta 2.0 doesn't not support this feature
+    if (!spark32) {
+      withSQLConf(("spark.databricks.delta.stats.skipping", "false")) {
+        spark.sql(s"""
+                     |DROP TABLE IF EXISTS lineitem_mergetree_stats_skipping;
+                     |""".stripMargin)
+
+        spark.sql(s"""
+                     |CREATE TABLE IF NOT EXISTS 
lineitem_mergetree_stats_skipping
+                     |(
+                     | l_orderkey      bigint,
+                     | l_partkey       bigint,
+                     | l_suppkey       bigint,
+                     | l_linenumber    bigint,
+                     | l_quantity      double,
+                     | l_extendedprice double,
+                     | l_discount      double,
+                     | l_tax           double,
+                     | l_returnflag    string,
+                     | l_linestatus    string,
+                     | l_shipdate      date,
+                     | l_commitdate    date,
+                     | l_receiptdate   date,
+                     | l_shipinstruct  string,
+                     | l_shipmode      string,
+                     | l_comment       string
+                     |)
+                     |USING clickhouse
+                     |PARTITIONED BY (l_returnflag)
+                     |TBLPROPERTIES (orderByKey='l_orderkey',
+                     |               primaryKey='l_orderkey')
+                     |LOCATION '$basePath/lineitem_mergetree_stats_skipping'
+                     |""".stripMargin)
+
+        // dynamic partitions
+        spark.sql(s"""
+                     | insert into table lineitem_mergetree_stats_skipping
+                     | select * from lineitem
+                     |""".stripMargin)
+
+        val sqlStr =
+          s"""
+             |SELECT
+             |    o_orderpriority,
+             |    count(*) AS order_count
+             |FROM
+             |    orders
+             |WHERE
+             |    o_orderdate >= date'1993-07-01'
+             |    AND o_orderdate < date'1993-07-01' + interval 3 month
+             |    AND EXISTS (
+             |        SELECT
+             |            *
+             |        FROM
+             |            lineitem
+             |        WHERE
+             |            l_orderkey = o_orderkey
+             |            AND l_commitdate < l_receiptdate)
+             |GROUP BY
+             |    o_orderpriority
+             |ORDER BY
+             |    o_orderpriority;
+             |
+             |""".stripMargin
+        runSql(sqlStr)(
+          df => {
+            val result = df.collect()
+            assertResult(5)(result.length)
+            assertResult("1-URGENT")(result(0).getString(0))
+            assertResult(999)(result(0).getLong(1))
+            assertResult("2-HIGH")(result(1).getString(0))
+            assertResult(997)(result(1).getLong(1))
+            assertResult("5-LOW")(result(4).getString(0))
+            assertResult(1077)(result(4).getLong(1))
+          })
+      }
+    }
+  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
index 87942c4155..fe6db65c3a 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/SparkInjector.scala
@@ -49,4 +49,8 @@ class SparkInjector private[injector] (
   def injectFunction(functionDescription: FunctionDescription): Unit = {
     
extensions.injectFunction(control.disabler().wrapFunction(functionDescription))
   }
+
+  def injectPreCBORule(builder: RuleBuilder): Unit = {
+    extensions.injectPreCBORule(control.disabler().wrapRule(builder))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to