This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f67a6faf5 [core] Query audit_log with incremental between should see
DELETE records (#1442)
f67a6faf5 is described below
commit f67a6faf5327c60e76f68b10330b8f83e66abb4a
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 28 20:32:01 2023 +0800
[core] Query audit_log with incremental between should see DELETE records
(#1442)
---
docs/content/how-to/querying-tables.md | 11 +++++
.../paimon/operation/KeyValueFileStoreRead.java | 14 +++++-
.../table/ChangelogValueCountFileStoreTable.java | 6 +++
.../table/ChangelogWithKeyFileStoreTable.java | 6 +++
.../apache/paimon/table/source/InnerTableRead.java | 4 ++
.../apache/paimon/table/system/AuditLogTable.java | 2 +-
.../apache/paimon/table/IncrementalTableTest.java | 52 ++++++++++++++++++++++
7 files changed, 92 insertions(+), 3 deletions(-)
diff --git a/docs/content/how-to/querying-tables.md
b/docs/content/how-to/querying-tables.md
index 83a9bb5cc..7be3add0a 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -106,6 +106,17 @@ spark.read()
{{< /tabs >}}
+In Batch SQL, the `DELETE` records are not allowed to be returned, so records
of `-D` will be dropped.
+If you want see `DELETE` records, you can use audit_log table:
+
+{{< tabs "incremental-audit_log" >}}
+{{< tab "Flink" >}}
+```sql
+SELECT * FROM t$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;
+```
+{{< /tab >}}
+{{< /tabs >}}
+
## Streaming Query
By default, Streaming read produces the latest snapshot on the table upon
first startup,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index fdfb3bb6d..9fce1cb85 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -80,6 +80,8 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
@Nullable private int[][] pushdownProjection;
@Nullable private int[][] outerProjection;
+ private boolean forceKeepDelete = false;
+
public KeyValueFileStoreRead(
FileIO fileIO,
SchemaManager schemaManager,
@@ -131,6 +133,11 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
return this;
}
+ public KeyValueFileStoreRead forceKeepDelete() {
+ this.forceKeepDelete = true;
+ return this;
+ }
+
@Override
public FileStoreRead<KeyValue> withFilter(Predicate predicate) {
List<Predicate> allFilters = new ArrayList<>();
@@ -227,8 +234,11 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
mergeFuncWrapper,
mergeSorter));
}
- DropDeleteReader reader =
- new
DropDeleteReader(ConcatRecordReader.create(sectionReaders));
+
+ RecordReader<KeyValue> reader =
ConcatRecordReader.create(sectionReaders);
+ if (!forceKeepDelete) {
+ reader = new DropDeleteReader(reader);
+ }
// Project results from SortMergeReader using
ProjectKeyRecordReader.
return keyProjectedFields == null ? reader : projectKey(reader,
keyProjectedFields);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
index 98d34dc55..5fa99529d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
@@ -141,6 +141,12 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
return new ValueCountRowDataRecordIterator(kvRecordIterator);
}
+
+ @Override
+ public InnerTableRead forceKeepDelete() {
+ read.forceKeepDelete();
+ return this;
+ }
};
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index bb8a03b36..ec692a013 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -212,6 +212,12 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
return new ValueContentRowDataRecordIterator(kvRecordIterator);
}
+
+ @Override
+ public InnerTableRead forceKeepDelete() {
+ read.forceKeepDelete();
+ return this;
+ }
};
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
index b1bdb9d52..ea07bc969 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
@@ -46,4 +46,8 @@ public interface InnerTableRead extends TableRead {
}
InnerTableRead withProjection(int[][] projection);
+
+ default InnerTableRead forceKeepDelete() {
+ return this;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index fb1581439..68f2a3b7a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -327,7 +327,7 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
private int[] readProjection;
private AuditLogRead(InnerTableRead dataRead) {
- this.dataRead = dataRead;
+ this.dataRead = dataRead.forceKeepDelete();
this.readProjection = defaultProjection();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
index 4a6a09e54..b69b5c5e1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.Pair;
import org.junit.jupiter.api.Test;
@@ -31,6 +32,7 @@ import org.junit.jupiter.api.Test;
import java.util.List;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
+import static org.apache.paimon.data.BinaryString.fromString;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -147,4 +149,54 @@ public class IncrementalTableTest extends TableTestBase {
GenericRow.of(1, 2, 4),
GenericRow.of(2, 1, 4));
}
+
+ @Test
+ public void testAuditLog() throws Exception {
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ Table table = catalog.getTable(identifier);
+
+ // snapshot 1: append
+ write(
+ table,
+ GenericRow.of(1, 1, 1),
+ GenericRow.of(1, 2, 1),
+ GenericRow.of(1, 3, 1),
+ GenericRow.of(2, 1, 1));
+
+ // snapshot 2: append + and -
+ write(
+ table,
+ GenericRow.ofKind(RowKind.DELETE, 1, 1, 1),
+ GenericRow.ofKind(RowKind.DELETE, 1, 2, 1),
+ GenericRow.of(1, 4, 1),
+ GenericRow.of(2, 1, 2));
+
+ // snapshot 3: append + and -
+ write(
+ table,
+ GenericRow.ofKind(RowKind.DELETE, 1, 3, 1),
+ GenericRow.of(1, 1, 2),
+ GenericRow.of(2, 1, 3),
+ GenericRow.of(2, 2, 1));
+
+ Table auditLog = catalog.getTable(identifier("T$audit_log"));
+ List<InternalRow> result = read(auditLog, Pair.of(INCREMENTAL_BETWEEN,
"1,3"));
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(fromString("+I"), 2, 1, 3),
+ GenericRow.of(fromString("+I"), 2, 2, 1),
+ GenericRow.of(fromString("+I"), 1, 1, 2),
+ GenericRow.of(fromString("+I"), 1, 4, 1),
+ GenericRow.of(fromString("-D"), 1, 2, 1),
+ GenericRow.of(fromString("-D"), 1, 3, 1));
+ }
}