This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c41e8baf0 [spark] Add data-evolution.merge-into.file-pruning option 
(#8065)
8c41e8baf0 is described below

commit 8c41e8baf0061806dd0abb5ef2a241afadc7f028
Author: Liurnly <[email protected]>
AuthorDate: Tue Jun 2 15:50:00 2026 +0800

    [spark] Add data-evolution.merge-into.file-pruning option (#8065)
    
    Add `data-evolution.merge-into.file-pruning` for MergeInto partial
    column update on data-evolution tables. When disabled, this option skips
    the file-level pruning step. It is useful when most files in the target
    partition are expected to be updated, so the overhead of collecting
    touched file IDs outweighs the benefit of pruning untouched files.
    
    When file pruning is skipped, Spark merge into still pushes down
    target-table partition filters from the MERGE ON condition to avoid
    scanning unrelated partitions.
---
 docs/generated/core_configuration.html             |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  15 ++
 .../MergeIntoPaimonDataEvolutionTable.scala        |  56 ++++++-
 .../MergeIntoPaimonDataEvolutionTable.scala        |  56 ++++++-
 .../paimon/spark/sql/RowTrackingTestBase.scala     | 178 ++++++++++++++++++++-
 5 files changed, 303 insertions(+), 8 deletions(-)

diff --git a/docs/generated/core_configuration.html 
b/docs/generated/core_configuration.html
index ebd8a5aca7..8f7ea2eaae 100644
--- a/docs/generated/core_configuration.html
+++ b/docs/generated/core_configuration.html
@@ -470,6 +470,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether enable data evolution for row tracking table.</td>
         </tr>
+        <tr>
+            <td><h5>data-evolution.merge-into.file-pruning</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>If true, enables the file-level pruning step for MergeInto 
partial column update on data-evolution tables. Set this to false when most 
files in the target partition are expected to be updated, so that the overhead 
of collecting touched file IDs outweighs the benefit of pruning untouched 
files.</td>
+        </tr>
         <tr>
             <td><h5>data-file.external-paths</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index f09a6edb4a..03140e9ecc 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2238,6 +2238,17 @@ public class CoreOptions implements Serializable {
                     .defaultValue(false)
                     .withDescription("Whether enable data evolution for row 
tracking table.");
 
+    public static final ConfigOption<Boolean> 
DATA_EVOLUTION_MERGE_INTO_FILE_PRUNING =
+            key("data-evolution.merge-into.file-pruning")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "If true, enables the file-level pruning step for 
MergeInto partial column "
+                                    + "update on data-evolution tables. "
+                                    + "Set this to false when most files in 
the target partition are expected "
+                                    + "to be updated, so that the overhead of 
collecting touched file IDs "
+                                    + "outweighs the benefit of pruning 
untouched files.");
+
     public static final ConfigOption<Boolean> BLOB_COMPACTION_ENABLED =
             key("blob-compaction.enabled")
                     .booleanType()
@@ -3744,6 +3755,10 @@ public class CoreOptions implements Serializable {
         return options.get(DATA_EVOLUTION_ENABLED);
     }
 
+    public boolean dataEvolutionMergeIntoFilePruning() {
+        return options.get(DATA_EVOLUTION_MERGE_INTO_FILE_PRUNING);
+    }
+
     public boolean blobCompactionEnabled() {
         return options.get(BLOB_COMPACTION_ENABLED);
     }
diff --git 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index c5f9e02715..9ce5b87386 100644
--- 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++ 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -27,16 +27,20 @@ import org.apache.paimon.spark.SparkTable
 import org.apache.paimon.spark.catalyst.analysis.PaimonRelation
 import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable
 import org.apache.paimon.spark.catalyst.analysis.PaimonUpdateTable.toColumn
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.util.ScanPlanHelper.createNewScanPlan
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
 import org.apache.paimon.table.source.DataSplit
+import org.apache.paimon.table.source.snapshot.SnapshotReader
+import org.apache.paimon.types.RowType
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
 import org.apache.spark.sql.PaimonUtils._
 import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolver
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
EqualTo, Expression, ExprId, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, And, 
AttributeReference, EqualTo, Expression, ExprId, Literal, PythonUDF, 
SubqueryExpression}
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
 import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter}
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -60,7 +64,9 @@ case class MergeIntoPaimonDataEvolutionTable(
     notMatchedActions: Seq[MergeAction],
     notMatchedBySourceActions: Seq[MergeAction])
   extends PaimonLeafRunnableCommand
-  with WithFileStoreTable {
+  with WithFileStoreTable
+  with ExpressionHelper
+  with Logging {
 
   private lazy val writer = PaimonSparkWriter(table)
 
@@ -143,7 +149,9 @@ case class MergeIntoPaimonDataEvolutionTable(
   }
 
   private def invokeMergeInto(sparkSession: SparkSession): Unit = {
-    val plan = table.newSnapshotReader().read()
+    val snapshotReader = table.newSnapshotReader()
+    pushDownMergePartitionFilter(snapshotReader)
+    val plan = snapshotReader.read()
     val tableSplits: Seq[DataSplit] = plan
       .splits()
       .asScala
@@ -202,6 +210,41 @@ case class MergeIntoPaimonDataEvolutionTable(
     writer.commit(updateCommit ++ insertCommit)
   }
 
+  private def pushDownMergePartitionFilter(snapshotReader: SnapshotReader): 
Unit = {
+    val partitionRowType = table.schema().logicalPartitionType()
+    if (partitionRowType.getFieldCount == 0) {
+      return
+    }
+
+    // matchedCondition comes from MergeIntoTable.mergeCondition, which is the 
MERGE ON condition.
+    val partitionPredicates = getExpressionOnlyRelated(matchedCondition, 
targetTable)
+      .map(splitConjunctivePredicates)
+      .map(extractMergePartitionFilters(_, partitionRowType))
+      .getOrElse(Seq.empty)
+
+    if (partitionPredicates.nonEmpty) {
+      val filter = convertConditionToPaimonPredicate(
+        partitionPredicates.reduce(And),
+        targetRelation.output,
+        rowType,
+        ignorePartialFailure = true)
+      filter.foreach(snapshotReader.withFilter)
+    }
+  }
+
+  private def extractMergePartitionFilters(
+      filters: Seq[Expression],
+      partitionRowType: RowType): Seq[Expression] = {
+    val partitionColumns = partitionRowType.getFieldNames.asScala.toSet
+    filters.filter {
+      f =>
+        f.deterministic &&
+        f.references.forall(attr => 
partitionColumns.exists(_.equalsIgnoreCase(attr.name))) &&
+        !SubqueryExpression.hasSubquery(f) &&
+        f.collect { case _: PythonUDF => true }.isEmpty
+    }
+  }
+
   private def targetRelatedSplits(
       sparkSession: SparkSession,
       tableSplits: Seq[DataSplit],
@@ -213,6 +256,13 @@ case class MergeIntoPaimonDataEvolutionTable(
       return tableSplits
     }
 
+    if (!table.coreOptions().dataEvolutionMergeIntoFilePruning()) {
+      logInfo(
+        "Skip file-level pruning for MergeInto partial column update on 
data-evolution table " +
+          s"${table.name()}.")
+      return tableSplits
+    }
+
     val sourceDss = createDataset(sparkSession, sourceTable)
 
     val firstRowIdsTouched = extractSourceRowIdMapping match {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index 008f14ed3e..c92229c1d0 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -27,17 +27,21 @@ import org.apache.paimon.spark.SparkTable
 import org.apache.paimon.spark.catalyst.analysis.PaimonRelation
 import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable
 import org.apache.paimon.spark.catalyst.analysis.PaimonUpdateTable.toColumn
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.util.ScanPlanHelper.createNewScanPlan
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
 import org.apache.paimon.table.source.DataSplit
+import org.apache.paimon.table.source.snapshot.SnapshotReader
+import org.apache.paimon.types.RowType
 import org.apache.paimon.types.VectorType.isVectorStoreFile
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
 import org.apache.spark.sql.PaimonUtils._
 import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolver
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
EqualTo, Expression, ExprId, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, And, 
AttributeReference, EqualTo, Expression, ExprId, Literal, Or, PythonUDF, 
SubqueryExpression}
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
 import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter}
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -61,7 +65,9 @@ case class MergeIntoPaimonDataEvolutionTable(
     notMatchedActions: Seq[MergeAction],
     notMatchedBySourceActions: Seq[MergeAction])
   extends PaimonLeafRunnableCommand
-  with WithFileStoreTable {
+  with WithFileStoreTable
+  with ExpressionHelper
+  with Logging {
 
   private lazy val writer = PaimonSparkWriter(table)
 
@@ -143,7 +149,9 @@ case class MergeIntoPaimonDataEvolutionTable(
   }
 
   private def invokeMergeInto(sparkSession: SparkSession): Unit = {
-    val plan = table.newSnapshotReader().read()
+    val snapshotReader = table.newSnapshotReader()
+    pushDownMergePartitionFilter(snapshotReader)
+    val plan = snapshotReader.read()
     val tableSplits: Seq[DataSplit] = plan
       .splits()
       .asScala
@@ -207,6 +215,41 @@ case class MergeIntoPaimonDataEvolutionTable(
     writer.commit(updateCommit ++ insertCommit)
   }
 
+  private def pushDownMergePartitionFilter(snapshotReader: SnapshotReader): 
Unit = {
+    val partitionRowType = table.schema().logicalPartitionType()
+    if (partitionRowType.getFieldCount == 0) {
+      return
+    }
+
+    // matchedCondition comes from MergeIntoTable.mergeCondition, which is the 
MERGE ON condition.
+    val partitionPredicates = getExpressionOnlyRelated(matchedCondition, 
targetTable)
+      .map(splitConjunctivePredicates)
+      .map(extractMergePartitionFilters(_, partitionRowType))
+      .getOrElse(Seq.empty)
+
+    if (partitionPredicates.nonEmpty) {
+      val filter = convertConditionToPaimonPredicate(
+        partitionPredicates.reduce(And),
+        targetRelation.output,
+        rowType,
+        ignorePartialFailure = true)
+      filter.foreach(snapshotReader.withFilter)
+    }
+  }
+
+  private def extractMergePartitionFilters(
+      filters: Seq[Expression],
+      partitionRowType: RowType): Seq[Expression] = {
+    val partitionColumns = partitionRowType.getFieldNames.asScala.toSet
+    filters.filter {
+      f =>
+        f.deterministic &&
+        f.references.forall(attr => 
partitionColumns.exists(_.equalsIgnoreCase(attr.name))) &&
+        !SubqueryExpression.hasSubquery(f) &&
+        f.collect { case _: PythonUDF => true }.isEmpty
+    }
+  }
+
   private def targetRelatedSplits(
       sparkSession: SparkSession,
       tableSplits: Seq[DataSplit],
@@ -218,6 +261,13 @@ case class MergeIntoPaimonDataEvolutionTable(
       return tableSplits
     }
 
+    if (!table.coreOptions().dataEvolutionMergeIntoFilePruning()) {
+      logInfo(
+        "Skip file-level pruning for MergeInto partial column update on 
data-evolution table " +
+          s"${table.name()}.")
+      return tableSplits
+    }
+
     val sourceDss = createDataset(sparkSession, sourceTable)
 
     val firstRowIdsTouched = extractSourceRowIdMapping match {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 644eb49847..6728f8cb54 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -19,12 +19,18 @@
 package org.apache.paimon.spark.sql
 
 import org.apache.paimon.Snapshot.CommitKind
+import org.apache.paimon.spark.PaimonMetrics.RESULTED_TABLE_FILES
 import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.spark.read.PaimonSplitScan
 import org.apache.paimon.table.source.DataSplit
 
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Join, 
LogicalPlan, MergeRows, RepartitionByExpression, Sort}
+import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Join, 
LogicalPlan, MergeRows, RepartitionByExpression, Sort, SubqueryAlias}
+import org.apache.spark.sql.connector.metric.CustomTaskMetric
 import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.paimon.Utils
 import org.apache.spark.sql.util.QueryExecutionListener
 
 import java.util.concurrent.{CountDownLatch, TimeUnit}
@@ -34,7 +40,7 @@ import scala.concurrent.{Await, Future}
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration.DurationInt
 
-abstract class RowTrackingTestBase extends PaimonSparkTestBase {
+abstract class RowTrackingTestBase extends PaimonSparkTestBase with 
AdaptiveSparkPlanHelper {
 
   import testImplicits._
 
@@ -628,6 +634,174 @@ abstract class RowTrackingTestBase extends 
PaimonSparkTestBase {
     }
   }
 
+  Seq(false, true).foreach {
+    filePruning =>
+      test(s"Data Evolution: merge into file pruning: $filePruning") {
+        withSparkSQLConf(
+          "spark.paimon.data-evolution.merge-into.file-pruning" ->
+            filePruning.toString) {
+          withTable("source", "target") {
+            sql("CREATE TABLE source (id INT, b INT, dt STRING)")
+            sql("INSERT INTO source VALUES (1, 100, '2026-05-28'), (3, 300, 
'2026-05-28')")
+
+            sql("""
+                  |CREATE TABLE target (id INT, b INT, c STRING, dt STRING)
+                  |TBLPROPERTIES (
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |PARTITIONED BY (dt)
+                  |""".stripMargin)
+            sql("INSERT INTO target VALUES (1, 10, 'old-1', '2026-05-28'), (2, 
20, 'old-2', '2026-05-28'), (4, 40, 'old-4', '2026-05-29')")
+
+            executeMergeIntoAndAssertFilePruning(
+              """
+                |MERGE INTO target
+                |USING source
+                |ON target.id = source.id AND target.dt = source.dt
+                |WHEN MATCHED THEN UPDATE SET target.b = source.b
+                |WHEN NOT MATCHED THEN INSERT (id, b, c, dt) VALUES (id, b, 
'new', dt)
+                |""".stripMargin,
+              filePruning
+            )
+
+            checkAnswer(
+              sql("SELECT id, b, c, dt FROM target ORDER BY id"),
+              Seq(
+                Row(1, 100, "old-1", "2026-05-28"),
+                Row(2, 20, "old-2", "2026-05-28"),
+                Row(3, 300, "new", "2026-05-28"),
+                Row(4, 40, "old-4", "2026-05-29"))
+            )
+          }
+        }
+      }
+  }
+
+  private def executeMergeIntoAndAssertFilePruning(mergeSql: String, 
filePruning: Boolean): Unit = {
+    @volatile var hasTargetFilePruningJoin = false
+    val listener = new QueryExecutionListener {
+      override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
+        checkPlan(qe.analyzed)
+      }
+
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
+        checkPlan(qe.analyzed)
+      }
+
+      private def checkPlan(plan: LogicalPlan): Unit = {
+        if (isTargetFilePruningJoinPlan(plan)) {
+          hasTargetFilePruningJoin = true
+          assert(
+            filePruning,
+            s"File pruning join should be skipped when file pruning is 
disabled: $plan")
+        }
+      }
+    }
+
+    spark.listenerManager.register(listener)
+    try {
+      sql(mergeSql)
+      Utils.waitUntilEventEmpty(spark)
+    } finally {
+      spark.listenerManager.unregister(listener)
+    }
+
+    if (filePruning) {
+      assert(hasTargetFilePruningJoin, "Expected target file pruning join 
plan.")
+    }
+  }
+
+  private def isTargetFilePruningJoinPlan(plan: LogicalPlan): Boolean = {
+    plan.collectFirst { case _: Deduplicate => true }.nonEmpty &&
+    plan.collectFirst { case _: Join => true }.nonEmpty &&
+    plan.collectFirst {
+      case SubqueryAlias(identifier, _) if identifier.name == "_left" => true
+    }.nonEmpty &&
+    plan.collectFirst { case _: MergeRows => true }.isEmpty
+  }
+
+  test("Data Evolution: merge into skip file pruning push down partition 
filter in on condition") {
+    withSparkSQLConf("spark.paimon.data-evolution.merge-into.file-pruning" -> 
"false") {
+      withTempView("source") {
+        withTable("target") {
+          Seq((1, 100), (2, 200), (3, 300)).toDF("id", 
"b").createOrReplaceTempView("source")
+
+          sql("""
+                |CREATE TABLE target (id INT, b INT, dt STRING)
+                |TBLPROPERTIES (
+                |  'row-tracking.enabled' = 'true',
+                |  'data-evolution.enabled' = 'true')
+                |PARTITIONED BY (dt)
+                |""".stripMargin)
+          sql("""
+                |INSERT INTO target VALUES
+                |  (1, 10, '2026-05-28'),
+                |  (2, 20, '2026-05-29'),
+                |  (3, 30, '2026-05-30')
+                |""".stripMargin)
+
+          val mergeSql =
+            """
+              |MERGE INTO target
+              |USING source
+              |ON target.id = source.id AND target.dt = '2026-05-28'
+              |WHEN MATCHED THEN UPDATE SET target.b = source.b
+              |""".stripMargin
+
+          executeMergeIntoAndAssertPartitionPruned(mergeSql)
+          checkAnswer(
+            sql("SELECT id, b, dt FROM target ORDER BY id"),
+            Seq(Row(1, 100, "2026-05-28"), Row(2, 20, "2026-05-29"), Row(3, 
30, "2026-05-30"))
+          )
+        }
+      }
+    }
+  }
+
+  private def executeMergeIntoAndAssertPartitionPruned(mergeSql: String): Unit 
= {
+    val resultedTableFiles = new 
java.util.concurrent.CopyOnWriteArrayList[Long]()
+
+    val listener = new QueryExecutionListener {
+      override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
+        checkPlan(qe)
+      }
+
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
+        checkPlan(qe)
+      }
+
+      private def checkPlan(qe: QueryExecution): Unit = {
+        collect(qe.executedPlan) {
+          case scanExec: BatchScanExec
+              if scanExec.scan.isInstanceOf[PaimonSplitScan] &&
+                scanExec.scan.description().startsWith("PaimonSplitScan: 
[target]") =>
+            val scan = scanExec.scan.asInstanceOf[PaimonSplitScan]
+            metric(scan.reportDriverMetrics(), RESULTED_TABLE_FILES)
+        }.foreach(resultedTableFile => 
resultedTableFiles.add(resultedTableFile))
+      }
+    }
+
+    spark.listenerManager.register(listener)
+    try {
+      sql(mergeSql)
+      Utils.waitUntilEventEmpty(spark)
+    } finally {
+      spark.listenerManager.unregister(listener)
+    }
+
+    val metrics = resultedTableFiles.asScala
+    assert(metrics.nonEmpty, "Expected target PaimonSplitScan in merge into 
executed plans.")
+    assert(
+      metrics.contains(1),
+      s"Expected target scan to read only one partition file, but got resulted 
table files: " +
+        metrics.mkString(", ")
+    )
+  }
+
+  private def metric(metrics: Array[CustomTaskMetric], name: String): Long = {
+    metrics.find(_.name() == name).get.value()
+  }
+
   test("Data Evolution: merge into table with data-evolution on _ROW_ID") {
     withTable("source", "target") {
       sql(

Reply via email to