This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ef43c411e0 [core] Fix incremental query audit log table with delete
(#5225)
ef43c411e0 is described below
commit ef43c411e08d087fa316099331def16101b9e98b
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Mar 7 11:43:04 2025 +0800
[core] Fix incremental query audit log table with delete (#5225)
---
.../source/splitread/IncrementalDiffSplitRead.java | 13 ++++++--
.../spark/sql/TableValuedFunctionsTest.scala | 35 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 3 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index 3b9f28d191..2827e450af 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -191,6 +191,7 @@ public class IncrementalDiffSplitRead implements
SplitRead<InternalRow> {
@Nullable
@Override
public KeyValue getResult() {
+ KeyValue toReturn = null;
if (kvs.size() == 1) {
KeyValue kv = kvs.get(0);
if (kv.level() == BEFORE_LEVEL) {
@@ -198,19 +199,25 @@ public class IncrementalDiffSplitRead implements
SplitRead<InternalRow> {
return kv.replaceValueKind(RowKind.DELETE);
}
} else {
- return kv;
+ toReturn = kv;
}
} else if (kvs.size() == 2) {
KeyValue latest = kvs.get(1);
if (latest.level() == AFTER_LEVEL) {
- if (!valueEquals()) {
- return latest;
+ // Return when the value or rowKind is different. Since
before is always add, we
+ // only need to check if after is not add.
+ if (!valueEquals() || !latest.isAdd()) {
+ toReturn = latest;
}
}
} else {
throw new IllegalArgumentException("Illegal kv number: " +
kvs.size());
}
+ if (toReturn != null && (keepDelete || toReturn.isAdd())) {
+ return toReturn;
+ }
+
return null;
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
index 117a5af02d..4f0ca73071 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
@@ -285,6 +285,41 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase {
}
}
+ test("Table Valued Functions: incremental query with delete after compact") {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (id INT) USING paimon
+ |TBLPROPERTIES ('primary-key'='id', 'bucket' = '1', 'write-only' =
'true')
+ |""".stripMargin)
+
+ sql("INSERT INTO t VALUES 1")
+ sql("INSERT INTO t VALUES 2")
+ sql("CALL sys.create_tag('t', 'tag1')")
+
+ sql("CALL sys.compact(table => 'T')")
+ sql("DELETE FROM t WHERE id = 1")
+ sql("CALL sys.create_tag('t', 'tag2')")
+
+ // tag1 tag2
+ // l0 f(+I 1),f(+I 2) f(-D 1)
+ // l1
+ // l2
+ // l3
+ // l4
+ // l5 f(+I 1,2)
+ checkAnswer(
+ sql("SELECT level FROM `t$files` VERSION AS OF 'tag1' ORDER BY level"),
+ Seq(Row(0), Row(0)))
+ checkAnswer(
+ sql("SELECT level FROM `t$files` VERSION AS OF 'tag2' ORDER BY level"),
+ Seq(Row(0), Row(5)))
+
+ checkAnswer(
+ sql("SELECT * FROM paimon_incremental_query('`t$audit_log`', 'tag1',
'tag2') ORDER BY id"),
+ Seq(Row("-D", 1)))
+ }
+ }
+
private def incrementalDF(tableIdent: String, start: Int, end: Int):
DataFrame = {
spark.read
.format("paimon")