This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f67a6faf5 [core] Query audit_log with incremental between should see 
DELETE records (#1442)
f67a6faf5 is described below

commit f67a6faf5327c60e76f68b10330b8f83e66abb4a
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 28 20:32:01 2023 +0800

    [core] Query audit_log with incremental between should see DELETE records 
(#1442)
---
 docs/content/how-to/querying-tables.md             | 11 +++++
 .../paimon/operation/KeyValueFileStoreRead.java    | 14 +++++-
 .../table/ChangelogValueCountFileStoreTable.java   |  6 +++
 .../table/ChangelogWithKeyFileStoreTable.java      |  6 +++
 .../apache/paimon/table/source/InnerTableRead.java |  4 ++
 .../apache/paimon/table/system/AuditLogTable.java  |  2 +-
 .../apache/paimon/table/IncrementalTableTest.java  | 52 ++++++++++++++++++++++
 7 files changed, 92 insertions(+), 3 deletions(-)

diff --git a/docs/content/how-to/querying-tables.md 
b/docs/content/how-to/querying-tables.md
index 83a9bb5cc..7be3add0a 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -106,6 +106,17 @@ spark.read()
 
 {{< /tabs >}}
 
+In Batch SQL, the `DELETE` records are not allowed to be returned, so records 
of `-D` will be dropped.
+If you want see `DELETE` records, you can use audit_log table:
+
+{{< tabs "incremental-audit_log" >}}
+{{< tab "Flink" >}}
+```sql
+SELECT * FROM t$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;
+```
+{{< /tab >}}
+{{< /tabs >}}
+
 ## Streaming Query
 
 By default, Streaming read produces the latest snapshot on the table upon 
first startup,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index fdfb3bb6d..9fce1cb85 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -80,6 +80,8 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
     @Nullable private int[][] pushdownProjection;
     @Nullable private int[][] outerProjection;
 
+    private boolean forceKeepDelete = false;
+
     public KeyValueFileStoreRead(
             FileIO fileIO,
             SchemaManager schemaManager,
@@ -131,6 +133,11 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
         return this;
     }
 
+    public KeyValueFileStoreRead forceKeepDelete() {
+        this.forceKeepDelete = true;
+        return this;
+    }
+
     @Override
     public FileStoreRead<KeyValue> withFilter(Predicate predicate) {
         List<Predicate> allFilters = new ArrayList<>();
@@ -227,8 +234,11 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
                                         mergeFuncWrapper,
                                         mergeSorter));
             }
-            DropDeleteReader reader =
-                    new 
DropDeleteReader(ConcatRecordReader.create(sectionReaders));
+
+            RecordReader<KeyValue> reader = 
ConcatRecordReader.create(sectionReaders);
+            if (!forceKeepDelete) {
+                reader = new DropDeleteReader(reader);
+            }
 
             // Project results from SortMergeReader using 
ProjectKeyRecordReader.
             return keyProjectedFields == null ? reader : projectKey(reader, 
keyProjectedFields);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
index 98d34dc55..5fa99529d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
@@ -141,6 +141,12 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
                     RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
                 return new ValueCountRowDataRecordIterator(kvRecordIterator);
             }
+
+            @Override
+            public InnerTableRead forceKeepDelete() {
+                read.forceKeepDelete();
+                return this;
+            }
         };
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index bb8a03b36..ec692a013 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -212,6 +212,12 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                     RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
                 return new ValueContentRowDataRecordIterator(kvRecordIterator);
             }
+
+            @Override
+            public InnerTableRead forceKeepDelete() {
+                read.forceKeepDelete();
+                return this;
+            }
         };
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
index b1bdb9d52..ea07bc969 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
@@ -46,4 +46,8 @@ public interface InnerTableRead extends TableRead {
     }
 
     InnerTableRead withProjection(int[][] projection);
+
+    default InnerTableRead forceKeepDelete() {
+        return this;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index fb1581439..68f2a3b7a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -327,7 +327,7 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         private int[] readProjection;
 
         private AuditLogRead(InnerTableRead dataRead) {
-            this.dataRead = dataRead;
+            this.dataRead = dataRead.forceKeepDelete();
             this.readProjection = defaultProjection();
         }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
index 4a6a09e54..b69b5c5e1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.Pair;
 
 import org.junit.jupiter.api.Test;
@@ -31,6 +32,7 @@ import org.junit.jupiter.api.Test;
 import java.util.List;
 
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
+import static org.apache.paimon.data.BinaryString.fromString;
 import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -147,4 +149,54 @@ public class IncrementalTableTest extends TableTestBase {
                         GenericRow.of(1, 2, 4),
                         GenericRow.of(2, 1, 4));
     }
+
+    @Test
+    public void testAuditLog() throws Exception {
+        Identifier identifier = identifier("T");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        Table table = catalog.getTable(identifier);
+
+        // snapshot 1: append
+        write(
+                table,
+                GenericRow.of(1, 1, 1),
+                GenericRow.of(1, 2, 1),
+                GenericRow.of(1, 3, 1),
+                GenericRow.of(2, 1, 1));
+
+        // snapshot 2: append + and -
+        write(
+                table,
+                GenericRow.ofKind(RowKind.DELETE, 1, 1, 1),
+                GenericRow.ofKind(RowKind.DELETE, 1, 2, 1),
+                GenericRow.of(1, 4, 1),
+                GenericRow.of(2, 1, 2));
+
+        // snapshot 3: append + and -
+        write(
+                table,
+                GenericRow.ofKind(RowKind.DELETE, 1, 3, 1),
+                GenericRow.of(1, 1, 2),
+                GenericRow.of(2, 1, 3),
+                GenericRow.of(2, 2, 1));
+
+        Table auditLog = catalog.getTable(identifier("T$audit_log"));
+        List<InternalRow> result = read(auditLog, Pair.of(INCREMENTAL_BETWEEN, 
"1,3"));
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(fromString("+I"), 2, 1, 3),
+                        GenericRow.of(fromString("+I"), 2, 2, 1),
+                        GenericRow.of(fromString("+I"), 1, 1, 2),
+                        GenericRow.of(fromString("+I"), 1, 4, 1),
+                        GenericRow.of(fromString("-D"), 1, 2, 1),
+                        GenericRow.of(fromString("-D"), 1, 3, 1));
+    }
 }

Reply via email to