This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bdcd3ecda1 [spark] Avoid multiple scan in
MergeIntoPaimonDataEvolutionTable (#7116)
bdcd3ecda1 is described below
commit bdcd3ecda15cf0ccb50cb27583d1106734a39936
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jan 26 15:26:34 2026 +0800
[spark] Avoid multiple scan in MergeIntoPaimonDataEvolutionTable (#7116)
---
.../org/apache/paimon/table/source/DataSplit.java | 23 +++
.../MergeIntoPaimonDataEvolutionTable.scala | 164 ++++++++++-----------
.../paimon/spark/sql/RowTrackingTestBase.scala | 2 -
3 files changed, 104 insertions(+), 85 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index cbd2e3086f..7775422fc8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -52,6 +52,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX;
@@ -295,6 +296,28 @@ public class DataSplit implements Split {
return hasIndexFile ? Optional.of(indexFiles) : Optional.empty();
}
+ public Optional<DataSplit> filterDataFile(Predicate<DataFileMeta> filter) {
+ List<DataFileMeta> filtered = new ArrayList<>();
+ List<DeletionFile> filteredDeletion = dataDeletionFiles == null ? null
: new ArrayList<>();
+ for (int i = 0; i < dataFiles.size(); i++) {
+ DataFileMeta file = dataFiles.get(i);
+ if (filter.test(file)) {
+ filtered.add(file);
+ if (filteredDeletion != null) {
+ filteredDeletion.add(dataDeletionFiles.get(i));
+ }
+ }
+ }
+ if (filtered.isEmpty()) {
+ return Optional.empty();
+ }
+ DataSplit split = new DataSplit();
+ split.assign(this);
+ split.dataFiles = filtered;
+ split.dataDeletionFiles = filteredDeletion;
+ return Optional.of(split);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index 545bf36fc1..3b1a1ec49a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -28,7 +28,6 @@ import
org.apache.paimon.spark.catalyst.analysis.PaimonRelation
import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable
import org.apache.paimon.spark.catalyst.analysis.PaimonUpdateTable.toColumn
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
-import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.spark.util.ScanPlanHelper.createNewScanPlan
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
@@ -97,45 +96,6 @@ case class MergeIntoPaimonDataEvolutionTable(
columns.toSet
}
- private val firstRowIds: immutable.IndexedSeq[Long] = table
- .store()
- .newScan()
- .withManifestEntryFilter(
- entry =>
- entry.file().firstRowId() != null && (!isBlobFile(
- entry
- .file()
- .fileName())))
- .plan()
- .files()
- .asScala
- .map(file => file.file().firstRowId().asInstanceOf[Long])
- .distinct
- .sorted
- .toIndexedSeq
-
- private val firstRowIdToBlobFirstRowIds = {
- val map = new mutable.HashMap[Long, List[Long]]()
- val files = table
- .store()
- .newScan()
- .withManifestEntryFilter(entry => isBlobFile(entry.file().fileName()))
- .plan()
- .files()
- .asScala
- .sortBy(f => f.file().firstRowId())
-
- for (file <- files) {
- val firstRowId = file.file().firstRowId().asInstanceOf[Long]
- val firstIdInNormalFile = floorBinarySearch(firstRowIds, firstRowId)
- map.update(
- firstIdInNormalFile,
- map.getOrElseUpdate(firstIdInNormalFile, List.empty[Long]) :+
firstRowId
- )
- }
- map
- }
-
/**
* Self-Merge pattern:
* {{{
@@ -171,27 +131,63 @@ case class MergeIntoPaimonDataEvolutionTable(
"NOT MATCHED BY SOURCE are not supported."
)
- lazy val targetRelation: DataSourceV2Relation =
PaimonRelation.getPaimonRelation(targetTable)
+ private lazy val targetRelation: DataSourceV2Relation =
+ PaimonRelation.getPaimonRelation(targetTable)
lazy val tableSchema: StructType = v2Table.schema
override def run(sparkSession: SparkSession): Seq[Row] = {
// Avoid that more than one source rows match the same target row.
val commitMessages = invokeMergeInto(sparkSession)
- writer.commit(commitMessages.toSeq)
+ writer.commit(commitMessages)
Seq.empty[Row]
}
private def invokeMergeInto(sparkSession: SparkSession): Seq[CommitMessage]
= {
+ val tableSplits: Seq[DataSplit] = table
+ .newSnapshotReader()
+ .read()
+ .splits()
+ .asScala
+ .map(_.asInstanceOf[DataSplit])
+ .toSeq
+
+ val firstRowIds: immutable.IndexedSeq[Long] = tableSplits
+ .flatMap(_.dataFiles().asScala)
+ .filter(file => file.firstRowId() != null &&
!isBlobFile(file.fileName()))
+ .map(file => file.firstRowId().asInstanceOf[Long])
+ .distinct
+ .sorted
+ .toIndexedSeq
+
+ val firstRowIdToBlobFirstRowIds: Map[Long, List[Long]] = {
+ val map = new mutable.HashMap[Long, List[Long]]()
+ val files = tableSplits
+ .flatMap(_.dataFiles().asScala)
+ .filter(file => isBlobFile(file.fileName()))
+ .sortBy(f => f.firstRowId())
+
+ for (file <- files) {
+ val firstRowId = file.firstRowId().asInstanceOf[Long]
+ val firstIdInNormalFile = floorBinarySearch(firstRowIds, firstRowId)
+ map.update(
+ firstIdInNormalFile,
+ map.getOrElseUpdate(firstIdInNormalFile, List.empty[Long]) :+
firstRowId
+ )
+ }
+ map.toMap
+ }
+
// step 1: find the related data split, make it target file plan
- val dataSplits: Seq[DataSplit] = targetRelatedSplits(sparkSession)
+ val dataSplits: Seq[DataSplit] =
+ targetRelatedSplits(sparkSession, tableSplits, firstRowIds,
firstRowIdToBlobFirstRowIds)
val touchedFileTargetRelation =
- createNewScanPlan(dataSplits.toSeq, targetRelation)
+ createNewScanPlan(dataSplits, targetRelation)
// step 2: invoke update action
val updateCommit =
if (matchedActions.nonEmpty) {
- val updateResult = updateActionInvoke(sparkSession,
touchedFileTargetRelation)
+ val updateResult = updateActionInvoke(sparkSession,
touchedFileTargetRelation, firstRowIds)
checkUpdateResult(updateResult)
} else Nil
@@ -204,17 +200,15 @@ case class MergeIntoPaimonDataEvolutionTable(
updateCommit ++ insertCommit
}
- private def targetRelatedSplits(sparkSession: SparkSession): Seq[DataSplit]
= {
+ private def targetRelatedSplits(
+ sparkSession: SparkSession,
+ tableSplits: Seq[DataSplit],
+ firstRowIds: immutable.IndexedSeq[Long],
+ firstRowIdToBlobFirstRowIds: Map[Long, List[Long]]): Seq[DataSplit] = {
// Self-Merge shortcut:
// In Self-Merge mode, every row in the table may be updated, so we scan
all splits.
if (isSelfMergeOnRowId) {
- return table
- .newSnapshotReader()
- .read()
- .splits()
- .asScala
- .map(_.asInstanceOf[DataSplit])
- .toSeq
+ return tableSplits
}
val sourceDss = createDataset(sparkSession, sourceTable)
@@ -222,7 +216,12 @@ case class MergeIntoPaimonDataEvolutionTable(
val firstRowIdsTouched = extractSourceRowIdMapping match {
case Some(sourceRowIdAttr) =>
// Shortcut: Directly get _FIRST_ROW_IDs from the source table.
- findRelatedFirstRowIds(sourceDss, sparkSession,
sourceRowIdAttr.name).toSet
+ findRelatedFirstRowIds(
+ sourceDss,
+ sparkSession,
+ firstRowIds,
+ firstRowIdToBlobFirstRowIds,
+ sourceRowIdAttr.name).toSet
case None =>
// Perform the full join to find related _FIRST_ROW_IDs.
@@ -230,25 +229,25 @@ case class MergeIntoPaimonDataEvolutionTable(
findRelatedFirstRowIds(
targetDss.alias("_left").join(sourceDss, toColumn(matchedCondition),
"inner"),
sparkSession,
- "_left." + ROW_ID_NAME).toSet
+ firstRowIds,
+ firstRowIdToBlobFirstRowIds,
+ "_left." + ROW_ID_NAME
+ ).toSet
}
- table
- .newSnapshotReader()
- .withManifestEntryFilter(
- entry =>
- entry.file().firstRowId() != null && firstRowIdsTouched.contains(
- entry.file().firstRowId()))
- .read()
- .splits()
- .asScala
- .map(_.asInstanceOf[DataSplit])
- .toSeq
+ tableSplits
+ .map(
+ split =>
+ split.filterDataFile(
+ file => file.firstRowId() != null &&
firstRowIdsTouched.contains(file.firstRowId())))
+ .filter(optional => optional.isPresent)
+ .map(_.get())
}
private def updateActionInvoke(
sparkSession: SparkSession,
- touchedFileTargetRelation: DataSourceV2Relation): Seq[CommitMessage] = {
+ touchedFileTargetRelation: DataSourceV2Relation,
+ firstRowIds: immutable.IndexedSeq[Long]): Seq[CommitMessage] = {
val mergeFields = extractFields(matchedCondition)
val allFields = mutable.SortedSet.empty[AttributeReference](
(o1, o2) => {
@@ -343,7 +342,7 @@ case class MergeIntoPaimonDataEvolutionTable(
child = readPlan
)
- val withFirstRowId = addFirstRowId(sparkSession, mergeRows)
+ val withFirstRowId = addFirstRowId(sparkSession, mergeRows, firstRowIds)
assert(withFirstRowId.schema.fields.length == updateColumnsSorted.size +
2)
withFirstRowId
} else {
@@ -381,7 +380,7 @@ case class MergeIntoPaimonDataEvolutionTable(
output = output,
child = joinPlan
)
- val withFirstRowId = addFirstRowId(sparkSession, mergeRows)
+ val withFirstRowId = addFirstRowId(sparkSession, mergeRows, firstRowIds)
assert(withFirstRowId.schema.fields.length == updateColumnsSorted.size +
2)
withFirstRowId
.repartition(col(FIRST_ROW_ID_NAME))
@@ -496,7 +495,6 @@ case class MergeIntoPaimonDataEvolutionTable(
.newIndexFileHandler()
.scan(latestSnapshot.get(), filter)
.asScala
- .toSeq
if (affectedIndexEntries.isEmpty) {
updateCommit
@@ -533,19 +531,19 @@ case class MergeIntoPaimonDataEvolutionTable(
private def findRelatedFirstRowIds(
dataset: Dataset[Row],
sparkSession: SparkSession,
+ firstRowIds: immutable.IndexedSeq[Long],
+ firstRowIdToBlobFirstRowIds: Map[Long, List[Long]],
identifier: String): Array[Long] = {
import sparkSession.implicits._
- val firstRowIdsFinal = firstRowIds
- val firstRowIdToBlobFirstRowIdsFinal = firstRowIdToBlobFirstRowIds
- val firstRowIdUdf = udf((rowId: Long) =>
floorBinarySearch(firstRowIdsFinal, rowId))
+ val firstRowIdUdf = udf((rowId: Long) => floorBinarySearch(firstRowIds,
rowId))
dataset
.select(firstRowIdUdf(col(identifier)))
.distinct()
.as[Long]
.flatMap(
f => {
- if (firstRowIdToBlobFirstRowIdsFinal.contains(f)) {
- firstRowIdToBlobFirstRowIdsFinal(f)
+ if (firstRowIdToBlobFirstRowIds.contains(f)) {
+ firstRowIdToBlobFirstRowIds(f)
} else {
Seq(f)
}
@@ -572,38 +570,38 @@ case class MergeIntoPaimonDataEvolutionTable(
private def attribute(name: String, plan: LogicalPlan) =
plan.output.find(attr => resolver(name, attr.name)).get
- private def addFirstRowId(sparkSession: SparkSession, plan: LogicalPlan):
Dataset[Row] = {
+ private def addFirstRowId(
+ sparkSession: SparkSession,
+ plan: LogicalPlan,
+ firstRowIds: immutable.IndexedSeq[Long]): Dataset[Row] = {
assert(plan.output.exists(_.name.equals(ROW_ID_NAME)))
- val firstRowIdsFinal = firstRowIds
- val firstRowIdUdf = udf((rowId: Long) =>
floorBinarySearch(firstRowIdsFinal, rowId))
+ val firstRowIdUdf = udf((rowId: Long) => floorBinarySearch(firstRowIds,
rowId))
val firstRowIdColumn = firstRowIdUdf(col(ROW_ID_NAME))
createDataset(sparkSession, plan).withColumn(FIRST_ROW_ID_NAME,
firstRowIdColumn)
}
}
object MergeIntoPaimonDataEvolutionTable {
+
final private val ROW_FROM_SOURCE = "__row_from_source"
final private val ROW_FROM_TARGET = "__row_from_target"
final private val ROW_ID_NAME = "_ROW_ID"
final private val FIRST_ROW_ID_NAME = "_FIRST_ROW_ID";
- final private val redundantColumns =
- Seq(PaimonMetadataColumn.ROW_ID.toAttribute)
- def floorBinarySearch(indexed: immutable.IndexedSeq[Long], value: Long):
Long = {
+ private def floorBinarySearch(indexed: immutable.IndexedSeq[Long], value:
Long): Long = {
if (indexed.isEmpty) {
throw new IllegalArgumentException("The input sorted sequence is empty.")
}
indexed.search(value) match {
case Found(foundIndex) => indexed(foundIndex)
- case InsertionPoint(insertionIndex) => {
+ case InsertionPoint(insertionIndex) =>
if (insertionIndex == 0) {
throw new IllegalArgumentException(
s"Value $value is less than the first element in the sorted
sequence.")
} else {
indexed(insertionIndex - 1)
}
- }
}
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 36feb56f82..e227604759 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -18,9 +18,7 @@
package org.apache.paimon.spark.sql
-import org.apache.paimon.CoreOptions
import org.apache.paimon.Snapshot.CommitKind
-import org.apache.paimon.format.FileFormat
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.Row