This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 605bad0ef [core] Fix that cannot read UPDATE_BEFORE of ignore-delete
table (#3467)
605bad0ef is described below
commit 605bad0efccc6dd5b8c3e4b6e793f420c134dac1
Author: yuzelin <[email protected]>
AuthorDate: Tue Jun 4 16:00:22 2024 +0800
[core] Fix that cannot read UPDATE_BEFORE of ignore-delete table (#3467)
---
.../paimon/io/KeyValueDataFileRecordReader.java | 22 ++++----------
.../paimon/io/KeyValueFileReaderFactory.java | 5 +---
.../compact/DeduplicateMergeFunction.java | 27 +++++++++++++++--
.../mergetree/compact/FirstRowMergeFunction.java | 35 +++++++++++++++-------
.../compact/PartialUpdateMergeFunction.java | 13 ++++++++
.../apache/paimon/table/PrimaryKeyTableUtils.java | 4 +--
.../mergetree/SortBufferWriteBufferTestBase.java | 1 +
.../LookupChangelogMergeFunctionWrapperTest.java | 3 +-
.../mergetree/compact/SortMergeReaderTestBase.java | 2 +-
.../apache/paimon/flink/BatchFileStoreITCase.java | 16 ++++++++++
.../paimon/flink/ContinuousFileStoreITCase.java | 17 +++++++----
.../apache/paimon/flink/PartialUpdateITCase.java | 30 +++++++++++++------
12 files changed, 125 insertions(+), 50 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
index 517720508..e44ad79ff 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
@@ -34,18 +34,12 @@ public class KeyValueDataFileRecordReader implements
RecordReader<KeyValue> {
private final RecordReader<InternalRow> reader;
private final KeyValueSerializer serializer;
private final int level;
- private final boolean ignoreDelete;
public KeyValueDataFileRecordReader(
- RecordReader<InternalRow> reader,
- RowType keyType,
- RowType valueType,
- int level,
- boolean ignoreDelete) {
+ RecordReader<InternalRow> reader, RowType keyType, RowType
valueType, int level) {
this.reader = reader;
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
- this.ignoreDelete = ignoreDelete;
}
@Nullable
@@ -56,15 +50,11 @@ public class KeyValueDataFileRecordReader implements
RecordReader<KeyValue> {
return null;
}
- RecordIterator<KeyValue> transformed =
- iterator.transform(
- internalRow ->
- internalRow == null
- ? null
- :
serializer.fromRow(internalRow).setLevel(level));
- // In 0.7- versions, the delete records might be written into data
file even when
- // ignore-delete configured, so the reader should also filter the
delete records
- return ignoreDelete ? transformed.filter(KeyValue::isAdd) :
transformed;
+ return iterator.transform(
+ internalRow ->
+ internalRow == null
+ ? null
+ :
serializer.fromRow(internalRow).setLevel(level));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 1caff3e68..c0341c208 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -68,7 +68,6 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final BinaryRow partition;
private final DeletionVector.Factory dvFactory;
- private final boolean ignoreDelete;
private KeyValueFileReaderFactory(
FileIO fileIO,
@@ -92,7 +91,6 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
this.partition = partition;
this.bulkFormatMappings = new HashMap<>();
this.dvFactory = dvFactory;
- this.ignoreDelete =
CoreOptions.fromMap(schema.options()).ignoreDelete();
}
@Override
@@ -151,8 +149,7 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
new ApplyDeletionVectorReader(fileRecordReader,
deletionVector.get());
}
- return new KeyValueDataFileRecordReader(
- fileRecordReader, keyType, valueType, level, ignoreDelete);
+ return new KeyValueDataFileRecordReader(fileRecordReader, keyType,
valueType, level);
}
public static Builder builder(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
index 3d6b341e3..bad9b2916 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
@@ -18,7 +18,9 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.options.Options;
import javax.annotation.Nullable;
@@ -28,8 +30,14 @@ import javax.annotation.Nullable;
*/
public class DeduplicateMergeFunction implements MergeFunction<KeyValue> {
+ private final boolean ignoreDelete;
+
private KeyValue latestKv;
+ private DeduplicateMergeFunction(boolean ignoreDelete) {
+ this.ignoreDelete = ignoreDelete;
+ }
+
@Override
public void reset() {
latestKv = null;
@@ -37,6 +45,11 @@ public class DeduplicateMergeFunction implements
MergeFunction<KeyValue> {
@Override
public void add(KeyValue kv) {
+ // In 0.7- versions, the delete records might be written into data
file even when
+ // ignore-delete configured, so ignoreDelete still needs to be checked
+ if (ignoreDelete && kv.valueKind().isRetract()) {
+ return;
+ }
latestKv = kv;
}
@@ -46,16 +59,26 @@ public class DeduplicateMergeFunction implements
MergeFunction<KeyValue> {
}
public static MergeFunctionFactory<KeyValue> factory() {
- return new Factory();
+ return new Factory(false);
+ }
+
+ public static MergeFunctionFactory<KeyValue> factory(Options options) {
+ return new Factory(options.get(CoreOptions.IGNORE_DELETE));
}
private static class Factory implements MergeFunctionFactory<KeyValue> {
private static final long serialVersionUID = 1L;
+ private final boolean ignoreDelete;
+
+ private Factory(boolean ignoreDelete) {
+ this.ignoreDelete = ignoreDelete;
+ }
+
@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
- return new DeduplicateMergeFunction();
+ return new DeduplicateMergeFunction(ignoreDelete);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
index 526b7e6f3..b3d9b8661 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
@@ -18,10 +18,11 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.options.Options;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Preconditions;
import javax.annotation.Nullable;
@@ -35,10 +36,12 @@ public class FirstRowMergeFunction implements
MergeFunction<KeyValue> {
private final InternalRowSerializer valueSerializer;
private KeyValue first;
public boolean containsHighLevel;
+ private final boolean ignoreDelete;
- protected FirstRowMergeFunction(RowType keyType, RowType valueType) {
+ protected FirstRowMergeFunction(RowType keyType, RowType valueType,
boolean ignoreDelete) {
this.keySerializer = new InternalRowSerializer(keyType);
this.valueSerializer = new InternalRowSerializer(valueType);
+ this.ignoreDelete = ignoreDelete;
}
@Override
@@ -49,10 +52,18 @@ public class FirstRowMergeFunction implements
MergeFunction<KeyValue> {
@Override
public void add(KeyValue kv) {
- Preconditions.checkArgument(
- kv.valueKind().isAdd(),
- "By default, First row merge engine can not accept
DELETE/UPDATE_BEFORE records.\n"
- + "You can config 'ignore-delete' to ignore the
DELETE/UPDATE_BEFORE records.");
+ if (kv.valueKind().isRetract()) {
+ // In 0.7- versions, the delete records might be written into data
file even when
+ // ignore-delete configured, so ignoreDelete still needs to be
checked
+ if (ignoreDelete) {
+ return;
+ } else {
+ throw new IllegalArgumentException(
+ "By default, First row merge engine can not accept
DELETE/UPDATE_BEFORE records.\n"
+ + "You can config 'first-row.ignore-delete' to
ignore the DELETE/UPDATE_BEFORE records.");
+ }
+ }
+
if (first == null) {
this.first = kv.copy(keySerializer, valueSerializer);
}
@@ -66,8 +77,10 @@ public class FirstRowMergeFunction implements
MergeFunction<KeyValue> {
return first;
}
- public static MergeFunctionFactory<KeyValue> factory(RowType keyType,
RowType valueType) {
- return new FirstRowMergeFunction.Factory(keyType, valueType);
+ public static MergeFunctionFactory<KeyValue> factory(
+ Options options, RowType keyType, RowType valueType) {
+ return new FirstRowMergeFunction.Factory(
+ keyType, valueType, options.get(CoreOptions.IGNORE_DELETE));
}
private static class Factory implements MergeFunctionFactory<KeyValue> {
@@ -75,15 +88,17 @@ public class FirstRowMergeFunction implements
MergeFunction<KeyValue> {
private static final long serialVersionUID = 1L;
private final RowType keyType;
private final RowType valueType;
+ private final boolean ignoreDelete;
- public Factory(RowType keyType, RowType valueType) {
+ public Factory(RowType keyType, RowType valueType, boolean
ignoreDelete) {
this.keyType = keyType;
this.valueType = valueType;
+ this.ignoreDelete = ignoreDelete;
}
@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
- return new FirstRowMergeFunction(keyType, valueType);
+ return new FirstRowMergeFunction(keyType, valueType, ignoreDelete);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 680e3dd06..737fc5284 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -67,6 +67,7 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
public static final String SEQUENCE_GROUP = "sequence-group";
private final InternalRow.FieldGetter[] getters;
+ private final boolean ignoreDelete;
private final Map<Integer, SequenceGenerator> fieldSequences;
private final boolean fieldSequenceEnabled;
private final Map<Integer, FieldAggregator> fieldAggregators;
@@ -78,10 +79,12 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
protected PartialUpdateMergeFunction(
InternalRow.FieldGetter[] getters,
+ boolean ignoreDelete,
Map<Integer, SequenceGenerator> fieldSequences,
Map<Integer, FieldAggregator> fieldAggregators,
boolean fieldSequenceEnabled) {
this.getters = getters;
+ this.ignoreDelete = ignoreDelete;
this.fieldSequences = fieldSequences;
this.fieldAggregators = fieldAggregators;
this.fieldSequenceEnabled = fieldSequenceEnabled;
@@ -100,6 +103,12 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
currentKey = kv.key();
if (kv.valueKind().isRetract()) {
+ // In 0.7- versions, the delete records might be written into data
file even when
+ // ignore-delete configured, so ignoreDelete still needs to be
checked
+ if (ignoreDelete) {
+ return;
+ }
+
if (fieldSequenceEnabled) {
retractWithSequenceGroup(kv);
return;
@@ -216,12 +225,14 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
private static final long serialVersionUID = 1L;
+ private final boolean ignoreDelete;
private final List<DataType> tableTypes;
private final Map<Integer, SequenceGenerator> fieldSequences;
private final Map<Integer, FieldAggregator> fieldAggregators;
private Factory(Options options, RowType rowType, List<String>
primaryKeys) {
+ this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
this.tableTypes = rowType.getFieldTypes();
List<String> fieldNames = rowType.getFieldNames();
@@ -307,12 +318,14 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
return new PartialUpdateMergeFunction(
createFieldGetters(Projection.of(projection).project(tableTypes)),
+ ignoreDelete,
projectedSequences,
projectedAggregators,
!fieldSequences.isEmpty());
} else {
return new PartialUpdateMergeFunction(
createFieldGetters(tableTypes),
+ ignoreDelete,
fieldSequences,
fieldAggregators,
!fieldSequences.isEmpty());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index b3dbbdd29..c74ab1f1f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -58,7 +58,7 @@ public class PrimaryKeyTableUtils {
switch (mergeEngine) {
case DEDUPLICATE:
- return DeduplicateMergeFunction.factory();
+ return DeduplicateMergeFunction.factory(conf);
case PARTIAL_UPDATE:
return PartialUpdateMergeFunction.factory(conf, rowType,
tableSchema.primaryKeys());
case AGGREGATE:
@@ -69,7 +69,7 @@ public class PrimaryKeyTableUtils {
tableSchema.primaryKeys());
case FIRST_ROW:
return FirstRowMergeFunction.factory(
- new RowType(extractor.keyFields(tableSchema)),
rowType);
+ conf, new RowType(extractor.keyFields(tableSchema)),
rowType);
default:
throw new UnsupportedOperationException("Unsupported merge
engine: " + mergeEngine);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index 5a911c006..9315fb15d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -257,6 +257,7 @@ public abstract class SortBufferWriteBufferTestBase {
@Override
protected MergeFunction<KeyValue> createMergeFunction() {
return FirstRowMergeFunction.factory(
+ new Options(),
new RowType(Lists.list(new DataField(0, "f0", new
IntType()))),
new RowType(Lists.list(new DataField(1, "f1", new
BigIntType()))))
.create();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 1a59b8b61..dbee0805a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -392,7 +392,8 @@ public class LookupChangelogMergeFunctionWrapperTest {
new RowType(
Lists.list(new DataField(0,
"f0", new IntType()))),
new RowType(
- Lists.list(new DataField(1,
"f1", new IntType())))),
+ Lists.list(new DataField(1,
"f1", new IntType()))),
+ false),
highLevel::contains);
// Without level-0
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index 3ae461532..81e49b81d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -130,7 +130,7 @@ public abstract class SortMergeReaderTestBase extends
CombiningRecordReaderTestB
RowType keyType = new RowType(Lists.list(new DataField(0, "f0",
new IntType())));
RowType valueType = new RowType(Lists.list(new DataField(1, "f1",
new BigIntType())));
return new LookupMergeFunction(
- new FirstRowMergeFunction(keyType, valueType), keyType,
valueType);
+ new FirstRowMergeFunction(keyType, valueType, false),
keyType, valueType);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 55bd886ad..5c502d88f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -422,6 +422,22 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
assertThat(sql("SELECT * FROM
ignore_delete")).containsExactly(Row.of(1, "B"));
}
+ @Test
+ public void testIgnoreDeleteCompatible() {
+ sql(
+ "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED,
v STRING) "
+ + "WITH ('merge-engine' = 'deduplicate', 'write-only'
= 'true')");
+
+ sql("INSERT INTO ignore_delete VALUES (1, 'A')");
+ // write delete records
+ sql("DELETE FROM ignore_delete WHERE pk = 1");
+ assertThat(sql("SELECT * FROM ignore_delete")).isEmpty();
+
+ // set ignore-delete and read
+ sql("ALTER TABLE ignore_delete set ('ignore-delete' = 'true')");
+ assertThat(sql("SELECT * FROM
ignore_delete")).containsExactlyInAnyOrder(Row.of(1, "A"));
+ }
+
@Test
public void testIgnoreDeleteWithRowKindField() {
sql(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 6f039cef2..c60b26f22 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -553,16 +553,23 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
public void testIgnoreDelete() throws Exception {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED,
v STRING) "
- + "WITH ('merge-engine' = 'deduplicate',
'ignore-delete' = 'true', 'bucket' = '1')");
- BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT *
FROM ignore_delete");
+ + "WITH ('merge-engine' = 'deduplicate',
'ignore-delete' = 'true')");
+
+ BlockingIterator<Row, Row> iterator =
+ streamSqlBlockIter(
+ "SELECT * FROM ignore_delete /*+
OPTIONS('continuous.discovery-interval' = '1s') */");
sql("INSERT INTO ignore_delete VALUES (1, 'A'), (2, 'B')");
sql("DELETE FROM ignore_delete WHERE pk = 1");
sql("INSERT INTO ignore_delete VALUES (1, 'B')");
- assertThat(iterator.collect(2))
- .containsExactlyInAnyOrder(
- Row.ofKind(RowKind.INSERT, 1, "B"),
Row.ofKind(RowKind.INSERT, 2, "B"));
+ // no -D[1, 'A'] but exist -U[1, 'A']
+ assertThat(iterator.collect(4))
+ .containsExactly(
+ Row.ofKind(RowKind.INSERT, 1, "A"),
+ Row.ofKind(RowKind.INSERT, 2, "B"),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 1, "A"),
+ Row.ofKind(RowKind.UPDATE_AFTER, 1, "B"));
iterator.close();
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index fde5da636..e41df196b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -500,18 +500,18 @@ public class PartialUpdateITCase extends
CatalogITCaseBase {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED,
a STRING, b STRING) WITH ("
+ " 'merge-engine' = 'partial-update',"
- + " 'ignore-delete' = 'true'"
+ + " 'ignore-delete' = 'true',"
+ + " 'changelog-producer' = 'lookup'"
+ ")");
if (localMerge) {
sql("ALTER TABLE ignore_delete SET ('local-merge-buffer-size' =
'256 kb')");
}
+ sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING),
'apple')");
+
String id =
TestValuesTableFactory.registerData(
- Arrays.asList(
- Row.ofKind(RowKind.INSERT, 1, null, "apple"),
- Row.ofKind(RowKind.DELETE, 1, null, "apple"),
- Row.ofKind(RowKind.INSERT, 1, "A", null)));
+ Collections.singletonList(Row.ofKind(RowKind.DELETE,
1, null, "apple")));
streamSqlIter(
"CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT
ENFORCED, a STRING, b STRING) "
+ "WITH ('connector'='values',
'bounded'='true', 'data-id'='%s', "
@@ -520,17 +520,30 @@ public class PartialUpdateITCase extends
CatalogITCaseBase {
.close();
sEnv.executeSql("INSERT INTO ignore_delete SELECT * FROM
input").await();
+ sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS
STRING))");
+
+ // batch read
assertThat(sql("SELECT * FROM ignore_delete"))
.containsExactlyInAnyOrder(Row.of(1, "A", "apple"));
+
+ // streaming read results has -U
+ BlockingIterator<Row, Row> iterator =
+ streamSqlBlockIter(
+ "SELECT * FROM ignore_delete /*+
OPTIONS('scan.timestamp-millis' = '0') */");
+ assertThat(iterator.collect(3))
+ .containsExactly(
+ Row.ofKind(RowKind.INSERT, 1, null, "apple"),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 1, null, "apple"),
+ Row.ofKind(RowKind.UPDATE_AFTER, 1, "A", "apple"));
+ iterator.close();
}
@Test
- public void testIgnoreDeleteInReader() throws Exception {
+ public void testIgnoreDeleteCompatible() throws Exception {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED,
a STRING, b STRING) WITH ("
+ " 'merge-engine' = 'deduplicate',"
- + " 'write-only' = 'true',"
- + " 'bucket' = '1')");
+ + " 'write-only' = 'true')");
sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING),
'apple')");
// write delete records
sql("DELETE FROM ignore_delete WHERE pk = 1");
@@ -542,7 +555,6 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
Map<String, String> newOptions = new HashMap<>();
newOptions.put(
CoreOptions.MERGE_ENGINE.key(),
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
- newOptions.put(CoreOptions.BUCKET.key(), "1");
newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true");
SchemaUtils.forceCommit(
new SchemaManager(LocalFileIO.create(), new Path(path,
"default.db/ignore_delete")),