This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bdcd3ecda1 [spark] Avoid multiple scan in 
MergeIntoPaimonDataEvolutionTable (#7116)
bdcd3ecda1 is described below

commit bdcd3ecda15cf0ccb50cb27583d1106734a39936
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jan 26 15:26:34 2026 +0800

    [spark] Avoid multiple scan in MergeIntoPaimonDataEvolutionTable (#7116)
---
 .../org/apache/paimon/table/source/DataSplit.java  |  23 +++
 .../MergeIntoPaimonDataEvolutionTable.scala        | 164 ++++++++++-----------
 .../paimon/spark/sql/RowTrackingTestBase.scala     |   2 -
 3 files changed, 104 insertions(+), 85 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index cbd2e3086f..7775422fc8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -52,6 +52,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX;
@@ -295,6 +296,28 @@ public class DataSplit implements Split {
         return hasIndexFile ? Optional.of(indexFiles) : Optional.empty();
     }
 
+    public Optional<DataSplit> filterDataFile(Predicate<DataFileMeta> filter) {
+        List<DataFileMeta> filtered = new ArrayList<>();
+        List<DeletionFile> filteredDeletion = dataDeletionFiles == null ? null 
: new ArrayList<>();
+        for (int i = 0; i < dataFiles.size(); i++) {
+            DataFileMeta file = dataFiles.get(i);
+            if (filter.test(file)) {
+                filtered.add(file);
+                if (filteredDeletion != null) {
+                    filteredDeletion.add(dataDeletionFiles.get(i));
+                }
+            }
+        }
+        if (filtered.isEmpty()) {
+            return Optional.empty();
+        }
+        DataSplit split = new DataSplit();
+        split.assign(this);
+        split.dataFiles = filtered;
+        split.dataDeletionFiles = filteredDeletion;
+        return Optional.of(split);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index 545bf36fc1..3b1a1ec49a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -28,7 +28,6 @@ import 
org.apache.paimon.spark.catalyst.analysis.PaimonRelation
 import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable
 import org.apache.paimon.spark.catalyst.analysis.PaimonUpdateTable.toColumn
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
-import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.spark.util.ScanPlanHelper.createNewScanPlan
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
@@ -97,45 +96,6 @@ case class MergeIntoPaimonDataEvolutionTable(
     columns.toSet
   }
 
-  private val firstRowIds: immutable.IndexedSeq[Long] = table
-    .store()
-    .newScan()
-    .withManifestEntryFilter(
-      entry =>
-        entry.file().firstRowId() != null && (!isBlobFile(
-          entry
-            .file()
-            .fileName())))
-    .plan()
-    .files()
-    .asScala
-    .map(file => file.file().firstRowId().asInstanceOf[Long])
-    .distinct
-    .sorted
-    .toIndexedSeq
-
-  private val firstRowIdToBlobFirstRowIds = {
-    val map = new mutable.HashMap[Long, List[Long]]()
-    val files = table
-      .store()
-      .newScan()
-      .withManifestEntryFilter(entry => isBlobFile(entry.file().fileName()))
-      .plan()
-      .files()
-      .asScala
-      .sortBy(f => f.file().firstRowId())
-
-    for (file <- files) {
-      val firstRowId = file.file().firstRowId().asInstanceOf[Long]
-      val firstIdInNormalFile = floorBinarySearch(firstRowIds, firstRowId)
-      map.update(
-        firstIdInNormalFile,
-        map.getOrElseUpdate(firstIdInNormalFile, List.empty[Long]) :+ 
firstRowId
-      )
-    }
-    map
-  }
-
   /**
    * Self-Merge pattern:
    * {{{
@@ -171,27 +131,63 @@ case class MergeIntoPaimonDataEvolutionTable(
       "NOT MATCHED BY SOURCE are not supported."
   )
 
-  lazy val targetRelation: DataSourceV2Relation = 
PaimonRelation.getPaimonRelation(targetTable)
+  private lazy val targetRelation: DataSourceV2Relation =
+    PaimonRelation.getPaimonRelation(targetTable)
 
   lazy val tableSchema: StructType = v2Table.schema
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     // Avoid that more than one source rows match the same target row.
     val commitMessages = invokeMergeInto(sparkSession)
-    writer.commit(commitMessages.toSeq)
+    writer.commit(commitMessages)
     Seq.empty[Row]
   }
 
   private def invokeMergeInto(sparkSession: SparkSession): Seq[CommitMessage] 
= {
+    val tableSplits: Seq[DataSplit] = table
+      .newSnapshotReader()
+      .read()
+      .splits()
+      .asScala
+      .map(_.asInstanceOf[DataSplit])
+      .toSeq
+
+    val firstRowIds: immutable.IndexedSeq[Long] = tableSplits
+      .flatMap(_.dataFiles().asScala)
+      .filter(file => file.firstRowId() != null && 
!isBlobFile(file.fileName()))
+      .map(file => file.firstRowId().asInstanceOf[Long])
+      .distinct
+      .sorted
+      .toIndexedSeq
+
+    val firstRowIdToBlobFirstRowIds: Map[Long, List[Long]] = {
+      val map = new mutable.HashMap[Long, List[Long]]()
+      val files = tableSplits
+        .flatMap(_.dataFiles().asScala)
+        .filter(file => isBlobFile(file.fileName()))
+        .sortBy(f => f.firstRowId())
+
+      for (file <- files) {
+        val firstRowId = file.firstRowId().asInstanceOf[Long]
+        val firstIdInNormalFile = floorBinarySearch(firstRowIds, firstRowId)
+        map.update(
+          firstIdInNormalFile,
+          map.getOrElseUpdate(firstIdInNormalFile, List.empty[Long]) :+ 
firstRowId
+        )
+      }
+      map.toMap
+    }
+
     // step 1: find the related data split, make it target file plan
-    val dataSplits: Seq[DataSplit] = targetRelatedSplits(sparkSession)
+    val dataSplits: Seq[DataSplit] =
+      targetRelatedSplits(sparkSession, tableSplits, firstRowIds, 
firstRowIdToBlobFirstRowIds)
     val touchedFileTargetRelation =
-      createNewScanPlan(dataSplits.toSeq, targetRelation)
+      createNewScanPlan(dataSplits, targetRelation)
 
     // step 2: invoke update action
     val updateCommit =
       if (matchedActions.nonEmpty) {
-        val updateResult = updateActionInvoke(sparkSession, 
touchedFileTargetRelation)
+        val updateResult = updateActionInvoke(sparkSession, 
touchedFileTargetRelation, firstRowIds)
         checkUpdateResult(updateResult)
       } else Nil
 
@@ -204,17 +200,15 @@ case class MergeIntoPaimonDataEvolutionTable(
     updateCommit ++ insertCommit
   }
 
-  private def targetRelatedSplits(sparkSession: SparkSession): Seq[DataSplit] 
= {
+  private def targetRelatedSplits(
+      sparkSession: SparkSession,
+      tableSplits: Seq[DataSplit],
+      firstRowIds: immutable.IndexedSeq[Long],
+      firstRowIdToBlobFirstRowIds: Map[Long, List[Long]]): Seq[DataSplit] = {
     // Self-Merge shortcut:
     // In Self-Merge mode, every row in the table may be updated, so we scan 
all splits.
     if (isSelfMergeOnRowId) {
-      return table
-        .newSnapshotReader()
-        .read()
-        .splits()
-        .asScala
-        .map(_.asInstanceOf[DataSplit])
-        .toSeq
+      return tableSplits
     }
 
     val sourceDss = createDataset(sparkSession, sourceTable)
@@ -222,7 +216,12 @@ case class MergeIntoPaimonDataEvolutionTable(
     val firstRowIdsTouched = extractSourceRowIdMapping match {
       case Some(sourceRowIdAttr) =>
         // Shortcut: Directly get _FIRST_ROW_IDs from the source table.
-        findRelatedFirstRowIds(sourceDss, sparkSession, 
sourceRowIdAttr.name).toSet
+        findRelatedFirstRowIds(
+          sourceDss,
+          sparkSession,
+          firstRowIds,
+          firstRowIdToBlobFirstRowIds,
+          sourceRowIdAttr.name).toSet
 
       case None =>
         // Perform the full join to find related _FIRST_ROW_IDs.
@@ -230,25 +229,25 @@ case class MergeIntoPaimonDataEvolutionTable(
         findRelatedFirstRowIds(
           targetDss.alias("_left").join(sourceDss, toColumn(matchedCondition), 
"inner"),
           sparkSession,
-          "_left." + ROW_ID_NAME).toSet
+          firstRowIds,
+          firstRowIdToBlobFirstRowIds,
+          "_left." + ROW_ID_NAME
+        ).toSet
     }
 
-    table
-      .newSnapshotReader()
-      .withManifestEntryFilter(
-        entry =>
-          entry.file().firstRowId() != null && firstRowIdsTouched.contains(
-            entry.file().firstRowId()))
-      .read()
-      .splits()
-      .asScala
-      .map(_.asInstanceOf[DataSplit])
-      .toSeq
+    tableSplits
+      .map(
+        split =>
+          split.filterDataFile(
+            file => file.firstRowId() != null && 
firstRowIdsTouched.contains(file.firstRowId())))
+      .filter(optional => optional.isPresent)
+      .map(_.get())
   }
 
   private def updateActionInvoke(
       sparkSession: SparkSession,
-      touchedFileTargetRelation: DataSourceV2Relation): Seq[CommitMessage] = {
+      touchedFileTargetRelation: DataSourceV2Relation,
+      firstRowIds: immutable.IndexedSeq[Long]): Seq[CommitMessage] = {
     val mergeFields = extractFields(matchedCondition)
     val allFields = mutable.SortedSet.empty[AttributeReference](
       (o1, o2) => {
@@ -343,7 +342,7 @@ case class MergeIntoPaimonDataEvolutionTable(
         child = readPlan
       )
 
-      val withFirstRowId = addFirstRowId(sparkSession, mergeRows)
+      val withFirstRowId = addFirstRowId(sparkSession, mergeRows, firstRowIds)
       assert(withFirstRowId.schema.fields.length == updateColumnsSorted.size + 
2)
       withFirstRowId
     } else {
@@ -381,7 +380,7 @@ case class MergeIntoPaimonDataEvolutionTable(
         output = output,
         child = joinPlan
       )
-      val withFirstRowId = addFirstRowId(sparkSession, mergeRows)
+      val withFirstRowId = addFirstRowId(sparkSession, mergeRows, firstRowIds)
       assert(withFirstRowId.schema.fields.length == updateColumnsSorted.size + 
2)
       withFirstRowId
         .repartition(col(FIRST_ROW_ID_NAME))
@@ -496,7 +495,6 @@ case class MergeIntoPaimonDataEvolutionTable(
       .newIndexFileHandler()
       .scan(latestSnapshot.get(), filter)
       .asScala
-      .toSeq
 
     if (affectedIndexEntries.isEmpty) {
       updateCommit
@@ -533,19 +531,19 @@ case class MergeIntoPaimonDataEvolutionTable(
   private def findRelatedFirstRowIds(
       dataset: Dataset[Row],
       sparkSession: SparkSession,
+      firstRowIds: immutable.IndexedSeq[Long],
+      firstRowIdToBlobFirstRowIds: Map[Long, List[Long]],
       identifier: String): Array[Long] = {
     import sparkSession.implicits._
-    val firstRowIdsFinal = firstRowIds
-    val firstRowIdToBlobFirstRowIdsFinal = firstRowIdToBlobFirstRowIds
-    val firstRowIdUdf = udf((rowId: Long) => 
floorBinarySearch(firstRowIdsFinal, rowId))
+    val firstRowIdUdf = udf((rowId: Long) => floorBinarySearch(firstRowIds, 
rowId))
     dataset
       .select(firstRowIdUdf(col(identifier)))
       .distinct()
       .as[Long]
       .flatMap(
         f => {
-          if (firstRowIdToBlobFirstRowIdsFinal.contains(f)) {
-            firstRowIdToBlobFirstRowIdsFinal(f)
+          if (firstRowIdToBlobFirstRowIds.contains(f)) {
+            firstRowIdToBlobFirstRowIds(f)
           } else {
             Seq(f)
           }
@@ -572,38 +570,38 @@ case class MergeIntoPaimonDataEvolutionTable(
   private def attribute(name: String, plan: LogicalPlan) =
     plan.output.find(attr => resolver(name, attr.name)).get
 
-  private def addFirstRowId(sparkSession: SparkSession, plan: LogicalPlan): 
Dataset[Row] = {
+  private def addFirstRowId(
+      sparkSession: SparkSession,
+      plan: LogicalPlan,
+      firstRowIds: immutable.IndexedSeq[Long]): Dataset[Row] = {
     assert(plan.output.exists(_.name.equals(ROW_ID_NAME)))
-    val firstRowIdsFinal = firstRowIds
-    val firstRowIdUdf = udf((rowId: Long) => 
floorBinarySearch(firstRowIdsFinal, rowId))
+    val firstRowIdUdf = udf((rowId: Long) => floorBinarySearch(firstRowIds, 
rowId))
     val firstRowIdColumn = firstRowIdUdf(col(ROW_ID_NAME))
     createDataset(sparkSession, plan).withColumn(FIRST_ROW_ID_NAME, 
firstRowIdColumn)
   }
 }
 
 object MergeIntoPaimonDataEvolutionTable {
+
   final private val ROW_FROM_SOURCE = "__row_from_source"
   final private val ROW_FROM_TARGET = "__row_from_target"
   final private val ROW_ID_NAME = "_ROW_ID"
   final private val FIRST_ROW_ID_NAME = "_FIRST_ROW_ID";
-  final private val redundantColumns =
-    Seq(PaimonMetadataColumn.ROW_ID.toAttribute)
 
-  def floorBinarySearch(indexed: immutable.IndexedSeq[Long], value: Long): 
Long = {
+  private def floorBinarySearch(indexed: immutable.IndexedSeq[Long], value: 
Long): Long = {
     if (indexed.isEmpty) {
       throw new IllegalArgumentException("The input sorted sequence is empty.")
     }
 
     indexed.search(value) match {
       case Found(foundIndex) => indexed(foundIndex)
-      case InsertionPoint(insertionIndex) => {
+      case InsertionPoint(insertionIndex) =>
         if (insertionIndex == 0) {
           throw new IllegalArgumentException(
             s"Value $value is less than the first element in the sorted 
sequence.")
         } else {
           indexed(insertionIndex - 1)
         }
-      }
     }
   }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 36feb56f82..e227604759 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -18,9 +18,7 @@
 
 package org.apache.paimon.spark.sql
 
-import org.apache.paimon.CoreOptions
 import org.apache.paimon.Snapshot.CommitKind
-import org.apache.paimon.format.FileFormat
 import org.apache.paimon.spark.PaimonSparkTestBase
 
 import org.apache.spark.sql.Row

Reply via email to