This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8c41e8baf0 [spark] Add data-evolution.merge-into.file-pruning option
(#8065)
8c41e8baf0 is described below
commit 8c41e8baf0061806dd0abb5ef2a241afadc7f028
Author: Liurnly <[email protected]>
AuthorDate: Tue Jun 2 15:50:00 2026 +0800
[spark] Add data-evolution.merge-into.file-pruning option (#8065)
Add `data-evolution.merge-into.file-pruning` for MergeInto partial
column update on data-evolution tables. When disabled, this option skips
the file-level pruning step. It is useful when most files in the target
partition are expected to be updated, so the overhead of collecting
touched file IDs outweighs the benefit of pruning untouched files.
When file pruning is skipped, Spark merge into still pushes down
target-table partition filters from the MERGE ON condition to avoid
scanning unrelated partitions.
---
docs/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 15 ++
.../MergeIntoPaimonDataEvolutionTable.scala | 56 ++++++-
.../MergeIntoPaimonDataEvolutionTable.scala | 56 ++++++-
.../paimon/spark/sql/RowTrackingTestBase.scala | 178 ++++++++++++++++++++-
5 files changed, 303 insertions(+), 8 deletions(-)
diff --git a/docs/generated/core_configuration.html
b/docs/generated/core_configuration.html
index ebd8a5aca7..8f7ea2eaae 100644
--- a/docs/generated/core_configuration.html
+++ b/docs/generated/core_configuration.html
@@ -470,6 +470,12 @@ under the License.
<td>Boolean</td>
<td>Whether enable data evolution for row tracking table.</td>
</tr>
+ <tr>
+ <td><h5>data-evolution.merge-into.file-pruning</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>If true, enables the file-level pruning step for MergeInto
partial column update on data-evolution tables. Set this to false when most
files in the target partition are expected to be updated, so that the overhead
of collecting touched file IDs outweighs the benefit of pruning untouched
files.</td>
+ </tr>
<tr>
<td><h5>data-file.external-paths</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index f09a6edb4a..03140e9ecc 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2238,6 +2238,17 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether enable data evolution for row
tracking table.");
+ public static final ConfigOption<Boolean>
DATA_EVOLUTION_MERGE_INTO_FILE_PRUNING =
+ key("data-evolution.merge-into.file-pruning")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "If true, enables the file-level pruning step for
MergeInto partial column "
+ + "update on data-evolution tables. "
+ + "Set this to false when most files in
the target partition are expected "
+ + "to be updated, so that the overhead of
collecting touched file IDs "
+ + "outweighs the benefit of pruning
untouched files.");
+
public static final ConfigOption<Boolean> BLOB_COMPACTION_ENABLED =
key("blob-compaction.enabled")
.booleanType()
@@ -3744,6 +3755,10 @@ public class CoreOptions implements Serializable {
return options.get(DATA_EVOLUTION_ENABLED);
}
+ public boolean dataEvolutionMergeIntoFilePruning() {
+ return options.get(DATA_EVOLUTION_MERGE_INTO_FILE_PRUNING);
+ }
+
public boolean blobCompactionEnabled() {
return options.get(BLOB_COMPACTION_ENABLED);
}
diff --git
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index c5f9e02715..9ce5b87386 100644
---
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -27,16 +27,20 @@ import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.catalyst.analysis.PaimonRelation
import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable
import org.apache.paimon.spark.catalyst.analysis.PaimonUpdateTable.toColumn
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.util.ScanPlanHelper.createNewScanPlan
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
import org.apache.paimon.table.source.DataSplit
+import org.apache.paimon.table.source.snapshot.SnapshotReader
+import org.apache.paimon.types.RowType
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.PaimonUtils._
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolver
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference,
EqualTo, Expression, ExprId, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, And,
AttributeReference, EqualTo, Expression, ExprId, Literal, PythonUDF,
SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral,
TrueLiteral}
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter}
import org.apache.spark.sql.catalyst.plans.logical._
@@ -60,7 +64,9 @@ case class MergeIntoPaimonDataEvolutionTable(
notMatchedActions: Seq[MergeAction],
notMatchedBySourceActions: Seq[MergeAction])
extends PaimonLeafRunnableCommand
- with WithFileStoreTable {
+ with WithFileStoreTable
+ with ExpressionHelper
+ with Logging {
private lazy val writer = PaimonSparkWriter(table)
@@ -143,7 +149,9 @@ case class MergeIntoPaimonDataEvolutionTable(
}
private def invokeMergeInto(sparkSession: SparkSession): Unit = {
- val plan = table.newSnapshotReader().read()
+ val snapshotReader = table.newSnapshotReader()
+ pushDownMergePartitionFilter(snapshotReader)
+ val plan = snapshotReader.read()
val tableSplits: Seq[DataSplit] = plan
.splits()
.asScala
@@ -202,6 +210,41 @@ case class MergeIntoPaimonDataEvolutionTable(
writer.commit(updateCommit ++ insertCommit)
}
+ private def pushDownMergePartitionFilter(snapshotReader: SnapshotReader):
Unit = {
+ val partitionRowType = table.schema().logicalPartitionType()
+ if (partitionRowType.getFieldCount == 0) {
+ return
+ }
+
+ // matchedCondition comes from MergeIntoTable.mergeCondition, which is the
MERGE ON condition.
+ val partitionPredicates = getExpressionOnlyRelated(matchedCondition,
targetTable)
+ .map(splitConjunctivePredicates)
+ .map(extractMergePartitionFilters(_, partitionRowType))
+ .getOrElse(Seq.empty)
+
+ if (partitionPredicates.nonEmpty) {
+ val filter = convertConditionToPaimonPredicate(
+ partitionPredicates.reduce(And),
+ targetRelation.output,
+ rowType,
+ ignorePartialFailure = true)
+ filter.foreach(snapshotReader.withFilter)
+ }
+ }
+
+ private def extractMergePartitionFilters(
+ filters: Seq[Expression],
+ partitionRowType: RowType): Seq[Expression] = {
+ val partitionColumns = partitionRowType.getFieldNames.asScala.toSet
+ filters.filter {
+ f =>
+ f.deterministic &&
+ f.references.forall(attr =>
partitionColumns.exists(_.equalsIgnoreCase(attr.name))) &&
+ !SubqueryExpression.hasSubquery(f) &&
+ f.collect { case _: PythonUDF => true }.isEmpty
+ }
+ }
+
private def targetRelatedSplits(
sparkSession: SparkSession,
tableSplits: Seq[DataSplit],
@@ -213,6 +256,13 @@ case class MergeIntoPaimonDataEvolutionTable(
return tableSplits
}
+ if (!table.coreOptions().dataEvolutionMergeIntoFilePruning()) {
+ logInfo(
+ "Skip file-level pruning for MergeInto partial column update on
data-evolution table " +
+ s"${table.name()}.")
+ return tableSplits
+ }
+
val sourceDss = createDataset(sparkSession, sourceTable)
val firstRowIdsTouched = extractSourceRowIdMapping match {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index 008f14ed3e..c92229c1d0 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -27,17 +27,21 @@ import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.catalyst.analysis.PaimonRelation
import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable
import org.apache.paimon.spark.catalyst.analysis.PaimonUpdateTable.toColumn
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.util.ScanPlanHelper.createNewScanPlan
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
import org.apache.paimon.table.source.DataSplit
+import org.apache.paimon.table.source.snapshot.SnapshotReader
+import org.apache.paimon.types.RowType
import org.apache.paimon.types.VectorType.isVectorStoreFile
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.PaimonUtils._
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolver
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference,
EqualTo, Expression, ExprId, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, And,
AttributeReference, EqualTo, Expression, ExprId, Literal, Or, PythonUDF,
SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral,
TrueLiteral}
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter}
import org.apache.spark.sql.catalyst.plans.logical._
@@ -61,7 +65,9 @@ case class MergeIntoPaimonDataEvolutionTable(
notMatchedActions: Seq[MergeAction],
notMatchedBySourceActions: Seq[MergeAction])
extends PaimonLeafRunnableCommand
- with WithFileStoreTable {
+ with WithFileStoreTable
+ with ExpressionHelper
+ with Logging {
private lazy val writer = PaimonSparkWriter(table)
@@ -143,7 +149,9 @@ case class MergeIntoPaimonDataEvolutionTable(
}
private def invokeMergeInto(sparkSession: SparkSession): Unit = {
- val plan = table.newSnapshotReader().read()
+ val snapshotReader = table.newSnapshotReader()
+ pushDownMergePartitionFilter(snapshotReader)
+ val plan = snapshotReader.read()
val tableSplits: Seq[DataSplit] = plan
.splits()
.asScala
@@ -207,6 +215,41 @@ case class MergeIntoPaimonDataEvolutionTable(
writer.commit(updateCommit ++ insertCommit)
}
+ private def pushDownMergePartitionFilter(snapshotReader: SnapshotReader):
Unit = {
+ val partitionRowType = table.schema().logicalPartitionType()
+ if (partitionRowType.getFieldCount == 0) {
+ return
+ }
+
+ // matchedCondition comes from MergeIntoTable.mergeCondition, which is the
MERGE ON condition.
+ val partitionPredicates = getExpressionOnlyRelated(matchedCondition,
targetTable)
+ .map(splitConjunctivePredicates)
+ .map(extractMergePartitionFilters(_, partitionRowType))
+ .getOrElse(Seq.empty)
+
+ if (partitionPredicates.nonEmpty) {
+ val filter = convertConditionToPaimonPredicate(
+ partitionPredicates.reduce(And),
+ targetRelation.output,
+ rowType,
+ ignorePartialFailure = true)
+ filter.foreach(snapshotReader.withFilter)
+ }
+ }
+
+ private def extractMergePartitionFilters(
+ filters: Seq[Expression],
+ partitionRowType: RowType): Seq[Expression] = {
+ val partitionColumns = partitionRowType.getFieldNames.asScala.toSet
+ filters.filter {
+ f =>
+ f.deterministic &&
+ f.references.forall(attr =>
partitionColumns.exists(_.equalsIgnoreCase(attr.name))) &&
+ !SubqueryExpression.hasSubquery(f) &&
+ f.collect { case _: PythonUDF => true }.isEmpty
+ }
+ }
+
private def targetRelatedSplits(
sparkSession: SparkSession,
tableSplits: Seq[DataSplit],
@@ -218,6 +261,13 @@ case class MergeIntoPaimonDataEvolutionTable(
return tableSplits
}
+ if (!table.coreOptions().dataEvolutionMergeIntoFilePruning()) {
+ logInfo(
+ "Skip file-level pruning for MergeInto partial column update on
data-evolution table " +
+ s"${table.name()}.")
+ return tableSplits
+ }
+
val sourceDss = createDataset(sparkSession, sourceTable)
val firstRowIdsTouched = extractSourceRowIdMapping match {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 644eb49847..6728f8cb54 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -19,12 +19,18 @@
package org.apache.paimon.spark.sql
import org.apache.paimon.Snapshot.CommitKind
+import org.apache.paimon.spark.PaimonMetrics.RESULTED_TABLE_FILES
import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.spark.read.PaimonSplitScan
import org.apache.paimon.table.source.DataSplit
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Join,
LogicalPlan, MergeRows, RepartitionByExpression, Sort}
+import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Join,
LogicalPlan, MergeRows, RepartitionByExpression, Sort, SubqueryAlias}
+import org.apache.spark.sql.connector.metric.CustomTaskMetric
import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.paimon.Utils
import org.apache.spark.sql.util.QueryExecutionListener
import java.util.concurrent.{CountDownLatch, TimeUnit}
@@ -34,7 +40,7 @@ import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
-abstract class RowTrackingTestBase extends PaimonSparkTestBase {
+abstract class RowTrackingTestBase extends PaimonSparkTestBase with
AdaptiveSparkPlanHelper {
import testImplicits._
@@ -628,6 +634,174 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
}
}
+ Seq(false, true).foreach {
+ filePruning =>
+ test(s"Data Evolution: merge into file pruning: $filePruning") {
+ withSparkSQLConf(
+ "spark.paimon.data-evolution.merge-into.file-pruning" ->
+ filePruning.toString) {
+ withTable("source", "target") {
+ sql("CREATE TABLE source (id INT, b INT, dt STRING)")
+ sql("INSERT INTO source VALUES (1, 100, '2026-05-28'), (3, 300,
'2026-05-28')")
+
+ sql("""
+ |CREATE TABLE target (id INT, b INT, c STRING, dt STRING)
+ |TBLPROPERTIES (
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ |PARTITIONED BY (dt)
+ |""".stripMargin)
+ sql("INSERT INTO target VALUES (1, 10, 'old-1', '2026-05-28'), (2,
20, 'old-2', '2026-05-28'), (4, 40, 'old-4', '2026-05-29')")
+
+ executeMergeIntoAndAssertFilePruning(
+ """
+ |MERGE INTO target
+ |USING source
+ |ON target.id = source.id AND target.dt = source.dt
+ |WHEN MATCHED THEN UPDATE SET target.b = source.b
+ |WHEN NOT MATCHED THEN INSERT (id, b, c, dt) VALUES (id, b,
'new', dt)
+ |""".stripMargin,
+ filePruning
+ )
+
+ checkAnswer(
+ sql("SELECT id, b, c, dt FROM target ORDER BY id"),
+ Seq(
+ Row(1, 100, "old-1", "2026-05-28"),
+ Row(2, 20, "old-2", "2026-05-28"),
+ Row(3, 300, "new", "2026-05-28"),
+ Row(4, 40, "old-4", "2026-05-29"))
+ )
+ }
+ }
+ }
+ }
+
+ private def executeMergeIntoAndAssertFilePruning(mergeSql: String,
filePruning: Boolean): Unit = {
+ @volatile var hasTargetFilePruningJoin = false
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
+ checkPlan(qe.analyzed)
+ }
+
+ override def onFailure(funcName: String, qe: QueryExecution, exception:
Exception): Unit = {
+ checkPlan(qe.analyzed)
+ }
+
+ private def checkPlan(plan: LogicalPlan): Unit = {
+ if (isTargetFilePruningJoinPlan(plan)) {
+ hasTargetFilePruningJoin = true
+ assert(
+ filePruning,
+ s"File pruning join should be skipped when file pruning is
disabled: $plan")
+ }
+ }
+ }
+
+ spark.listenerManager.register(listener)
+ try {
+ sql(mergeSql)
+ Utils.waitUntilEventEmpty(spark)
+ } finally {
+ spark.listenerManager.unregister(listener)
+ }
+
+ if (filePruning) {
+ assert(hasTargetFilePruningJoin, "Expected target file pruning join
plan.")
+ }
+ }
+
+ private def isTargetFilePruningJoinPlan(plan: LogicalPlan): Boolean = {
+ plan.collectFirst { case _: Deduplicate => true }.nonEmpty &&
+ plan.collectFirst { case _: Join => true }.nonEmpty &&
+ plan.collectFirst {
+ case SubqueryAlias(identifier, _) if identifier.name == "_left" => true
+ }.nonEmpty &&
+ plan.collectFirst { case _: MergeRows => true }.isEmpty
+ }
+
+ test("Data Evolution: merge into skip file pruning push down partition
filter in on condition") {
+ withSparkSQLConf("spark.paimon.data-evolution.merge-into.file-pruning" ->
"false") {
+ withTempView("source") {
+ withTable("target") {
+ Seq((1, 100), (2, 200), (3, 300)).toDF("id",
"b").createOrReplaceTempView("source")
+
+ sql("""
+ |CREATE TABLE target (id INT, b INT, dt STRING)
+ |TBLPROPERTIES (
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ |PARTITIONED BY (dt)
+ |""".stripMargin)
+ sql("""
+ |INSERT INTO target VALUES
+ | (1, 10, '2026-05-28'),
+ | (2, 20, '2026-05-29'),
+ | (3, 30, '2026-05-30')
+ |""".stripMargin)
+
+ val mergeSql =
+ """
+ |MERGE INTO target
+ |USING source
+ |ON target.id = source.id AND target.dt = '2026-05-28'
+ |WHEN MATCHED THEN UPDATE SET target.b = source.b
+ |""".stripMargin
+
+ executeMergeIntoAndAssertPartitionPruned(mergeSql)
+ checkAnswer(
+ sql("SELECT id, b, dt FROM target ORDER BY id"),
+ Seq(Row(1, 100, "2026-05-28"), Row(2, 20, "2026-05-29"), Row(3,
30, "2026-05-30"))
+ )
+ }
+ }
+ }
+ }
+
+ private def executeMergeIntoAndAssertPartitionPruned(mergeSql: String): Unit
= {
+ val resultedTableFiles = new
java.util.concurrent.CopyOnWriteArrayList[Long]()
+
+ val listener = new QueryExecutionListener {
+ override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
+ checkPlan(qe)
+ }
+
+ override def onFailure(funcName: String, qe: QueryExecution, exception:
Exception): Unit = {
+ checkPlan(qe)
+ }
+
+ private def checkPlan(qe: QueryExecution): Unit = {
+ collect(qe.executedPlan) {
+ case scanExec: BatchScanExec
+ if scanExec.scan.isInstanceOf[PaimonSplitScan] &&
+ scanExec.scan.description().startsWith("PaimonSplitScan:
[target]") =>
+ val scan = scanExec.scan.asInstanceOf[PaimonSplitScan]
+ metric(scan.reportDriverMetrics(), RESULTED_TABLE_FILES)
+ }.foreach(resultedTableFile =>
resultedTableFiles.add(resultedTableFile))
+ }
+ }
+
+ spark.listenerManager.register(listener)
+ try {
+ sql(mergeSql)
+ Utils.waitUntilEventEmpty(spark)
+ } finally {
+ spark.listenerManager.unregister(listener)
+ }
+
+ val metrics = resultedTableFiles.asScala
+ assert(metrics.nonEmpty, "Expected target PaimonSplitScan in merge into
executed plans.")
+ assert(
+ metrics.contains(1),
+ s"Expected target scan to read only one partition file, but got resulted
table files: " +
+ metrics.mkString(", ")
+ )
+ }
+
+ private def metric(metrics: Array[CustomTaskMetric], name: String): Long = {
+ metrics.find(_.name() == name).get.value()
+ }
+
test("Data Evolution: merge into table with data-evolution on _ROW_ID") {
withTable("source", "target") {
sql(