This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 922ec95239 [spark] Refactor
UpdatePaimonTableCommand.writeUpdatedAndUnchangedData for row tracking
922ec95239 is described below
commit 922ec95239a91ac98019819e881064c7d454ce1f
Author: JingsongLi <[email protected]>
AuthorDate: Fri Oct 10 12:04:50 2025 +0800
[spark] Refactor UpdatePaimonTableCommand.writeUpdatedAndUnchangedData for
row tracking
---
.../spark/commands/UpdatePaimonTableCommand.scala | 39 +++++++++++-----------
.../paimon/spark/sql/RowTrackingTestBase.scala | 6 ----
2 files changed, 20 insertions(+), 25 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index dc94777134..75749b5714 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -143,31 +143,32 @@ case class UpdatePaimonTableCommand(
private def writeUpdatedAndUnchangedData(
sparkSession: SparkSession,
toUpdateScanRelation: LogicalPlan): Seq[CommitMessage] = {
+
+ def rowIdCol = col(ROW_ID_COLUMN)
+
+ def sequenceNumberCol = toColumn(
+ optimizedIf(
+ condition,
+ Literal(null),
+ toExpression(sparkSession, col(SEQUENCE_NUMBER_COLUMN))))
+ .as(SEQUENCE_NUMBER_COLUMN)
+
var updateColumns = updateExpressions.zip(relation.output).map {
- case (_, origin) if origin.name == ROW_ID_COLUMN =>
- col(ROW_ID_COLUMN)
- case (_, origin) if origin.name == SEQUENCE_NUMBER_COLUMN =>
- toColumn(
- optimizedIf(
- condition,
- Literal(null),
- toExpression(sparkSession, col(SEQUENCE_NUMBER_COLUMN))))
- .as(SEQUENCE_NUMBER_COLUMN)
+ case (_, origin) if origin.name == ROW_ID_COLUMN => rowIdCol
+ case (_, origin) if origin.name == SEQUENCE_NUMBER_COLUMN =>
sequenceNumberCol
case (update, origin) =>
val updated = optimizedIf(condition, update, origin)
toColumn(updated).as(origin.name, origin.metadata)
}
- if (coreOptions.rowTrackingEnabled() && !relation.outputSet.exists(_.name
== ROW_ID_COLUMN)) {
- updateColumns ++= Seq(
- col(ROW_ID_COLUMN),
- toColumn(
- optimizedIf(
- condition,
- Literal(null),
- toExpression(sparkSession, col(SEQUENCE_NUMBER_COLUMN))))
- .as(SEQUENCE_NUMBER_COLUMN)
- )
+ if (coreOptions.rowTrackingEnabled()) {
+ val outputSet = relation.outputSet
+ if (!outputSet.exists(_.name == ROW_ID_COLUMN)) {
+ updateColumns ++= Seq(rowIdCol)
+ }
+ if (!outputSet.exists(_.name == SEQUENCE_NUMBER_COLUMN)) {
+ updateColumns ++= Seq(sequenceNumberCol)
+ }
}
val data = createDataset(sparkSession,
toUpdateScanRelation).select(updateColumns: _*)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 09fa495fd9..422b51a10e 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -103,12 +103,6 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
Seq(Row(1, 1, 0, 1), Row(2, 222, 1, 3), Row(3, 3, 2, 1))
)
-
- sql("UPDATE t SET data = 111 WHERE _SEQUENCE_NUMBER = 1")
- checkAnswer(
- sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
- Seq(Row(1, 111, 0, 4), Row(2, 222, 1, 3), Row(3, 111, 2, 4))
- )
}
}