This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 605bad0ef [core] Fix that cannot read UPDATE_BEFORE of ignore-delete 
table (#3467)
605bad0ef is described below

commit 605bad0efccc6dd5b8c3e4b6e793f420c134dac1
Author: yuzelin <[email protected]>
AuthorDate: Tue Jun 4 16:00:22 2024 +0800

    [core] Fix that cannot read UPDATE_BEFORE of ignore-delete table (#3467)
---
 .../paimon/io/KeyValueDataFileRecordReader.java    | 22 ++++----------
 .../paimon/io/KeyValueFileReaderFactory.java       |  5 +---
 .../compact/DeduplicateMergeFunction.java          | 27 +++++++++++++++--
 .../mergetree/compact/FirstRowMergeFunction.java   | 35 +++++++++++++++-------
 .../compact/PartialUpdateMergeFunction.java        | 13 ++++++++
 .../apache/paimon/table/PrimaryKeyTableUtils.java  |  4 +--
 .../mergetree/SortBufferWriteBufferTestBase.java   |  1 +
 .../LookupChangelogMergeFunctionWrapperTest.java   |  3 +-
 .../mergetree/compact/SortMergeReaderTestBase.java |  2 +-
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 16 ++++++++++
 .../paimon/flink/ContinuousFileStoreITCase.java    | 17 +++++++----
 .../apache/paimon/flink/PartialUpdateITCase.java   | 30 +++++++++++++------
 12 files changed, 125 insertions(+), 50 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
index 517720508..e44ad79ff 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
@@ -34,18 +34,12 @@ public class KeyValueDataFileRecordReader implements 
RecordReader<KeyValue> {
     private final RecordReader<InternalRow> reader;
     private final KeyValueSerializer serializer;
     private final int level;
-    private final boolean ignoreDelete;
 
     public KeyValueDataFileRecordReader(
-            RecordReader<InternalRow> reader,
-            RowType keyType,
-            RowType valueType,
-            int level,
-            boolean ignoreDelete) {
+            RecordReader<InternalRow> reader, RowType keyType, RowType 
valueType, int level) {
         this.reader = reader;
         this.serializer = new KeyValueSerializer(keyType, valueType);
         this.level = level;
-        this.ignoreDelete = ignoreDelete;
     }
 
     @Nullable
@@ -56,15 +50,11 @@ public class KeyValueDataFileRecordReader implements 
RecordReader<KeyValue> {
             return null;
         }
 
-        RecordIterator<KeyValue> transformed =
-                iterator.transform(
-                        internalRow ->
-                                internalRow == null
-                                        ? null
-                                        : 
serializer.fromRow(internalRow).setLevel(level));
-        // In 0.7- versions, the delete records might be written into data 
file even when
-        // ignore-delete configured, so the reader should also filter the 
delete records
-        return ignoreDelete ? transformed.filter(KeyValue::isAdd) : 
transformed;
+        return iterator.transform(
+                internalRow ->
+                        internalRow == null
+                                ? null
+                                : 
serializer.fromRow(internalRow).setLevel(level));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 1caff3e68..c0341c208 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -68,7 +68,6 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
     private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
     private final BinaryRow partition;
     private final DeletionVector.Factory dvFactory;
-    private final boolean ignoreDelete;
 
     private KeyValueFileReaderFactory(
             FileIO fileIO,
@@ -92,7 +91,6 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
         this.partition = partition;
         this.bulkFormatMappings = new HashMap<>();
         this.dvFactory = dvFactory;
-        this.ignoreDelete = 
CoreOptions.fromMap(schema.options()).ignoreDelete();
     }
 
     @Override
@@ -151,8 +149,7 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
                     new ApplyDeletionVectorReader(fileRecordReader, 
deletionVector.get());
         }
 
-        return new KeyValueDataFileRecordReader(
-                fileRecordReader, keyType, valueType, level, ignoreDelete);
+        return new KeyValueDataFileRecordReader(fileRecordReader, keyType, 
valueType, level);
     }
 
     public static Builder builder(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
index 3d6b341e3..bad9b2916 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
@@ -18,7 +18,9 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.options.Options;
 
 import javax.annotation.Nullable;
 
@@ -28,8 +30,14 @@ import javax.annotation.Nullable;
  */
 public class DeduplicateMergeFunction implements MergeFunction<KeyValue> {
 
+    private final boolean ignoreDelete;
+
     private KeyValue latestKv;
 
+    private DeduplicateMergeFunction(boolean ignoreDelete) {
+        this.ignoreDelete = ignoreDelete;
+    }
+
     @Override
     public void reset() {
         latestKv = null;
@@ -37,6 +45,11 @@ public class DeduplicateMergeFunction implements 
MergeFunction<KeyValue> {
 
     @Override
     public void add(KeyValue kv) {
+        // In 0.7- versions, the delete records might be written into data 
file even when
+        // ignore-delete configured, so ignoreDelete still needs to be checked
+        if (ignoreDelete && kv.valueKind().isRetract()) {
+            return;
+        }
         latestKv = kv;
     }
 
@@ -46,16 +59,26 @@ public class DeduplicateMergeFunction implements 
MergeFunction<KeyValue> {
     }
 
     public static MergeFunctionFactory<KeyValue> factory() {
-        return new Factory();
+        return new Factory(false);
+    }
+
+    public static MergeFunctionFactory<KeyValue> factory(Options options) {
+        return new Factory(options.get(CoreOptions.IGNORE_DELETE));
     }
 
     private static class Factory implements MergeFunctionFactory<KeyValue> {
 
         private static final long serialVersionUID = 1L;
 
+        private final boolean ignoreDelete;
+
+        private Factory(boolean ignoreDelete) {
+            this.ignoreDelete = ignoreDelete;
+        }
+
         @Override
         public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
-            return new DeduplicateMergeFunction();
+            return new DeduplicateMergeFunction(ignoreDelete);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
index 526b7e6f3..b3d9b8661 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
@@ -18,10 +18,11 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Preconditions;
 
 import javax.annotation.Nullable;
 
@@ -35,10 +36,12 @@ public class FirstRowMergeFunction implements 
MergeFunction<KeyValue> {
     private final InternalRowSerializer valueSerializer;
     private KeyValue first;
     public boolean containsHighLevel;
+    private final boolean ignoreDelete;
 
-    protected FirstRowMergeFunction(RowType keyType, RowType valueType) {
+    protected FirstRowMergeFunction(RowType keyType, RowType valueType, 
boolean ignoreDelete) {
         this.keySerializer = new InternalRowSerializer(keyType);
         this.valueSerializer = new InternalRowSerializer(valueType);
+        this.ignoreDelete = ignoreDelete;
     }
 
     @Override
@@ -49,10 +52,18 @@ public class FirstRowMergeFunction implements 
MergeFunction<KeyValue> {
 
     @Override
     public void add(KeyValue kv) {
-        Preconditions.checkArgument(
-                kv.valueKind().isAdd(),
-                "By default, First row merge engine can not accept 
DELETE/UPDATE_BEFORE records.\n"
-                        + "You can config 'ignore-delete' to ignore the 
DELETE/UPDATE_BEFORE records.");
+        if (kv.valueKind().isRetract()) {
+            // In 0.7- versions, the delete records might be written into data 
file even when
+            // ignore-delete configured, so ignoreDelete still needs to be 
checked
+            if (ignoreDelete) {
+                return;
+            } else {
+                throw new IllegalArgumentException(
+                        "By default, First row merge engine can not accept 
DELETE/UPDATE_BEFORE records.\n"
+                                + "You can config 'first-row.ignore-delete' to 
ignore the DELETE/UPDATE_BEFORE records.");
+            }
+        }
+
         if (first == null) {
             this.first = kv.copy(keySerializer, valueSerializer);
         }
@@ -66,8 +77,10 @@ public class FirstRowMergeFunction implements 
MergeFunction<KeyValue> {
         return first;
     }
 
-    public static MergeFunctionFactory<KeyValue> factory(RowType keyType, 
RowType valueType) {
-        return new FirstRowMergeFunction.Factory(keyType, valueType);
+    public static MergeFunctionFactory<KeyValue> factory(
+            Options options, RowType keyType, RowType valueType) {
+        return new FirstRowMergeFunction.Factory(
+                keyType, valueType, options.get(CoreOptions.IGNORE_DELETE));
     }
 
     private static class Factory implements MergeFunctionFactory<KeyValue> {
@@ -75,15 +88,17 @@ public class FirstRowMergeFunction implements 
MergeFunction<KeyValue> {
         private static final long serialVersionUID = 1L;
         private final RowType keyType;
         private final RowType valueType;
+        private final boolean ignoreDelete;
 
-        public Factory(RowType keyType, RowType valueType) {
+        public Factory(RowType keyType, RowType valueType, boolean 
ignoreDelete) {
             this.keyType = keyType;
             this.valueType = valueType;
+            this.ignoreDelete = ignoreDelete;
         }
 
         @Override
         public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
-            return new FirstRowMergeFunction(keyType, valueType);
+            return new FirstRowMergeFunction(keyType, valueType, ignoreDelete);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 680e3dd06..737fc5284 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -67,6 +67,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
     public static final String SEQUENCE_GROUP = "sequence-group";
 
     private final InternalRow.FieldGetter[] getters;
+    private final boolean ignoreDelete;
     private final Map<Integer, SequenceGenerator> fieldSequences;
     private final boolean fieldSequenceEnabled;
     private final Map<Integer, FieldAggregator> fieldAggregators;
@@ -78,10 +79,12 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
 
     protected PartialUpdateMergeFunction(
             InternalRow.FieldGetter[] getters,
+            boolean ignoreDelete,
             Map<Integer, SequenceGenerator> fieldSequences,
             Map<Integer, FieldAggregator> fieldAggregators,
             boolean fieldSequenceEnabled) {
         this.getters = getters;
+        this.ignoreDelete = ignoreDelete;
         this.fieldSequences = fieldSequences;
         this.fieldAggregators = fieldAggregators;
         this.fieldSequenceEnabled = fieldSequenceEnabled;
@@ -100,6 +103,12 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         currentKey = kv.key();
 
         if (kv.valueKind().isRetract()) {
+            // In 0.7- versions, the delete records might be written into data 
file even when
+            // ignore-delete configured, so ignoreDelete still needs to be 
checked
+            if (ignoreDelete) {
+                return;
+            }
+
             if (fieldSequenceEnabled) {
                 retractWithSequenceGroup(kv);
                 return;
@@ -216,12 +225,14 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
 
         private static final long serialVersionUID = 1L;
 
+        private final boolean ignoreDelete;
         private final List<DataType> tableTypes;
         private final Map<Integer, SequenceGenerator> fieldSequences;
 
         private final Map<Integer, FieldAggregator> fieldAggregators;
 
         private Factory(Options options, RowType rowType, List<String> 
primaryKeys) {
+            this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
             this.tableTypes = rowType.getFieldTypes();
 
             List<String> fieldNames = rowType.getFieldNames();
@@ -307,12 +318,14 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
 
                 return new PartialUpdateMergeFunction(
                         
createFieldGetters(Projection.of(projection).project(tableTypes)),
+                        ignoreDelete,
                         projectedSequences,
                         projectedAggregators,
                         !fieldSequences.isEmpty());
             } else {
                 return new PartialUpdateMergeFunction(
                         createFieldGetters(tableTypes),
+                        ignoreDelete,
                         fieldSequences,
                         fieldAggregators,
                         !fieldSequences.isEmpty());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index b3dbbdd29..c74ab1f1f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -58,7 +58,7 @@ public class PrimaryKeyTableUtils {
 
         switch (mergeEngine) {
             case DEDUPLICATE:
-                return DeduplicateMergeFunction.factory();
+                return DeduplicateMergeFunction.factory(conf);
             case PARTIAL_UPDATE:
                 return PartialUpdateMergeFunction.factory(conf, rowType, 
tableSchema.primaryKeys());
             case AGGREGATE:
@@ -69,7 +69,7 @@ public class PrimaryKeyTableUtils {
                         tableSchema.primaryKeys());
             case FIRST_ROW:
                 return FirstRowMergeFunction.factory(
-                        new RowType(extractor.keyFields(tableSchema)), 
rowType);
+                        conf, new RowType(extractor.keyFields(tableSchema)), 
rowType);
             default:
                 throw new UnsupportedOperationException("Unsupported merge 
engine: " + mergeEngine);
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index 5a911c006..9315fb15d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -257,6 +257,7 @@ public abstract class SortBufferWriteBufferTestBase {
         @Override
         protected MergeFunction<KeyValue> createMergeFunction() {
             return FirstRowMergeFunction.factory(
+                            new Options(),
                             new RowType(Lists.list(new DataField(0, "f0", new 
IntType()))),
                             new RowType(Lists.list(new DataField(1, "f1", new 
BigIntType()))))
                     .create();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 1a59b8b61..dbee0805a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -392,7 +392,8 @@ public class LookupChangelogMergeFunctionWrapperTest {
                                         new RowType(
                                                 Lists.list(new DataField(0, 
"f0", new IntType()))),
                                         new RowType(
-                                                Lists.list(new DataField(1, 
"f1", new IntType())))),
+                                                Lists.list(new DataField(1, 
"f1", new IntType()))),
+                                        false),
                         highLevel::contains);
 
         // Without level-0
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index 3ae461532..81e49b81d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -130,7 +130,7 @@ public abstract class SortMergeReaderTestBase extends 
CombiningRecordReaderTestB
             RowType keyType = new RowType(Lists.list(new DataField(0, "f0", 
new IntType())));
             RowType valueType = new RowType(Lists.list(new DataField(1, "f1", 
new BigIntType())));
             return new LookupMergeFunction(
-                    new FirstRowMergeFunction(keyType, valueType), keyType, 
valueType);
+                    new FirstRowMergeFunction(keyType, valueType, false), 
keyType, valueType);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 55bd886ad..5c502d88f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -422,6 +422,22 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "B"));
     }
 
+    @Test
+    public void testIgnoreDeleteCompatible() {
+        sql(
+                "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, 
v STRING) "
+                        + "WITH ('merge-engine' = 'deduplicate', 'write-only' 
= 'true')");
+
+        sql("INSERT INTO ignore_delete VALUES (1, 'A')");
+        // write delete records
+        sql("DELETE FROM ignore_delete WHERE pk = 1");
+        assertThat(sql("SELECT * FROM ignore_delete")).isEmpty();
+
+        // set ignore-delete and read
+        sql("ALTER TABLE ignore_delete set ('ignore-delete' = 'true')");
+        assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactlyInAnyOrder(Row.of(1, "A"));
+    }
+
     @Test
     public void testIgnoreDeleteWithRowKindField() {
         sql(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 6f039cef2..c60b26f22 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -553,16 +553,23 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     public void testIgnoreDelete() throws Exception {
         sql(
                 "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, 
v STRING) "
-                        + "WITH ('merge-engine' = 'deduplicate', 
'ignore-delete' = 'true', 'bucket' = '1')");
-        BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * 
FROM ignore_delete");
+                        + "WITH ('merge-engine' = 'deduplicate', 
'ignore-delete' = 'true')");
+
+        BlockingIterator<Row, Row> iterator =
+                streamSqlBlockIter(
+                        "SELECT * FROM ignore_delete /*+ 
OPTIONS('continuous.discovery-interval' = '1s') */");
 
         sql("INSERT INTO ignore_delete VALUES (1, 'A'), (2, 'B')");
         sql("DELETE FROM ignore_delete WHERE pk = 1");
         sql("INSERT INTO ignore_delete VALUES (1, 'B')");
 
-        assertThat(iterator.collect(2))
-                .containsExactlyInAnyOrder(
-                        Row.ofKind(RowKind.INSERT, 1, "B"), 
Row.ofKind(RowKind.INSERT, 2, "B"));
+        // no -D[1, 'A'] but exist -U[1, 'A']
+        assertThat(iterator.collect(4))
+                .containsExactly(
+                        Row.ofKind(RowKind.INSERT, 1, "A"),
+                        Row.ofKind(RowKind.INSERT, 2, "B"),
+                        Row.ofKind(RowKind.UPDATE_BEFORE, 1, "A"),
+                        Row.ofKind(RowKind.UPDATE_AFTER, 1, "B"));
         iterator.close();
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index fde5da636..e41df196b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -500,18 +500,18 @@ public class PartialUpdateITCase extends 
CatalogITCaseBase {
         sql(
                 "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, 
a STRING, b STRING) WITH ("
                         + " 'merge-engine' = 'partial-update',"
-                        + " 'ignore-delete' = 'true'"
+                        + " 'ignore-delete' = 'true',"
+                        + " 'changelog-producer' = 'lookup'"
                         + ")");
         if (localMerge) {
             sql("ALTER TABLE ignore_delete SET ('local-merge-buffer-size' = 
'256 kb')");
         }
 
+        sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING), 
'apple')");
+
         String id =
                 TestValuesTableFactory.registerData(
-                        Arrays.asList(
-                                Row.ofKind(RowKind.INSERT, 1, null, "apple"),
-                                Row.ofKind(RowKind.DELETE, 1, null, "apple"),
-                                Row.ofKind(RowKind.INSERT, 1, "A", null)));
+                        Collections.singletonList(Row.ofKind(RowKind.DELETE, 
1, null, "apple")));
         streamSqlIter(
                         "CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT 
ENFORCED, a STRING, b STRING) "
                                 + "WITH ('connector'='values', 
'bounded'='true', 'data-id'='%s', "
@@ -520,17 +520,30 @@ public class PartialUpdateITCase extends 
CatalogITCaseBase {
                 .close();
         sEnv.executeSql("INSERT INTO ignore_delete SELECT * FROM 
input").await();
 
+        sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS 
STRING))");
+
+        // batch read
         assertThat(sql("SELECT * FROM ignore_delete"))
                 .containsExactlyInAnyOrder(Row.of(1, "A", "apple"));
+
+        // streaming read results has -U
+        BlockingIterator<Row, Row> iterator =
+                streamSqlBlockIter(
+                        "SELECT * FROM ignore_delete /*+ 
OPTIONS('scan.timestamp-millis' = '0') */");
+        assertThat(iterator.collect(3))
+                .containsExactly(
+                        Row.ofKind(RowKind.INSERT, 1, null, "apple"),
+                        Row.ofKind(RowKind.UPDATE_BEFORE, 1, null, "apple"),
+                        Row.ofKind(RowKind.UPDATE_AFTER, 1, "A", "apple"));
+        iterator.close();
     }
 
     @Test
-    public void testIgnoreDeleteInReader() throws Exception {
+    public void testIgnoreDeleteCompatible() throws Exception {
         sql(
                 "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, 
a STRING, b STRING) WITH ("
                         + " 'merge-engine' = 'deduplicate',"
-                        + " 'write-only' = 'true',"
-                        + " 'bucket' = '1')");
+                        + " 'write-only' = 'true')");
         sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING), 
'apple')");
         // write delete records
         sql("DELETE FROM ignore_delete WHERE pk = 1");
@@ -542,7 +555,6 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
         Map<String, String> newOptions = new HashMap<>();
         newOptions.put(
                 CoreOptions.MERGE_ENGINE.key(), 
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
-        newOptions.put(CoreOptions.BUCKET.key(), "1");
         newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true");
         SchemaUtils.forceCommit(
                 new SchemaManager(LocalFileIO.create(), new Path(path, 
"default.db/ignore_delete")),

Reply via email to