This is an automated email from the ASF dual-hosted git repository.
aitozi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8255e3376c [spark] Eliminate the union stage when merging into without
notMatchedActions (#5195)
8255e3376c is described below
commit 8255e3376ccd4218084ce01c3b0c52bad8750e32
Author: WenjunMin <[email protected]>
AuthorDate: Mon Mar 10 22:51:25 2025 +0800
[spark] Eliminate the union stage when merging into without
notMatchedActions (#5195)
---
.../spark/commands/MergeIntoPaimonTable.scala | 41 ++++++++++++----------
1 file changed, 23 insertions(+), 18 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 8a70ca384d..a1c2abcc52 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -144,29 +144,38 @@ case class MergeIntoPaimonTable(
}
} else {
val touchedFilePathsSet = mutable.Set.empty[String]
+ val intersectionFilePaths = mutable.Set.empty[String]
+
def hasUpdate(actions: Seq[MergeAction]): Boolean = {
actions.exists {
case _: UpdateAction | _: DeleteAction => true
case _ => false
}
}
- if (hasUpdate(matchedActions) || notMatchedActions.nonEmpty) {
- touchedFilePathsSet ++= findTouchedFiles(
- targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition),
"inner"),
+
+ def findTouchedFiles0(joinType: String): Array[String] = {
+ findTouchedFiles(
+ targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition),
joinType),
sparkSession,
- "_left." + FILE_PATH_COLUMN
- )
+ "_left." + FILE_PATH_COLUMN)
}
+
+ if (hasUpdate(matchedActions)) {
+ touchedFilePathsSet ++= findTouchedFiles0("inner")
+ } else if (notMatchedActions.nonEmpty) {
+ intersectionFilePaths ++= findTouchedFiles0("inner")
+ }
+
if (hasUpdate(notMatchedBySourceActions)) {
- touchedFilePathsSet ++= findTouchedFiles(
- targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition),
"left_anti"),
- sparkSession,
- "_left." + FILE_PATH_COLUMN)
+ touchedFilePathsSet ++= findTouchedFiles0("left_anti")
}
- val targetFilePaths: Array[String] = findTouchedFiles(targetDS,
sparkSession)
val touchedFilePaths: Array[String] = touchedFilePathsSet.toArray
- val unTouchedFilePaths =
targetFilePaths.filterNot(touchedFilePaths.contains)
+ val unTouchedFilePaths = if (notMatchedActions.nonEmpty) {
+ intersectionFilePaths.diff(touchedFilePathsSet).toArray
+ } else {
+ Array[String]()
+ }
val (touchedFiles, touchedFileRelation) =
createNewRelation(touchedFilePaths, dataFilePathToMeta, relation)
@@ -177,13 +186,9 @@ case class MergeIntoPaimonTable(
// modified and was from touched file, it should be kept too.
val touchedDsWithFileTouchedCol = createDataset(sparkSession,
touchedFileRelation)
.withColumn(FILE_TOUCHED_COL, lit(true))
- val targetDSWithFileTouchedCol = if (notMatchedBySourceActions.nonEmpty)
{
- touchedDsWithFileTouchedCol.union(
- createDataset(sparkSession, unTouchedFileRelation)
- .withColumn(FILE_TOUCHED_COL, lit(false)))
- } else {
- touchedDsWithFileTouchedCol
- }
+ val targetDSWithFileTouchedCol = touchedDsWithFileTouchedCol.union(
+ createDataset(sparkSession, unTouchedFileRelation)
+ .withColumn(FILE_TOUCHED_COL, lit(false)))
val toWriteDS =
constructChangedRows(sparkSession,
targetDSWithFileTouchedCol).drop(ROW_KIND_COL)