This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5add480d8 [core] Fix that ignore-delete option is not compatible with
old delete records and LocalMergeOperator (#3139)
5add480d8 is described below
commit 5add480d8ed6630e2cf7923fd26cc6a5aa9db47f
Author: yuzelin <[email protected]>
AuthorDate: Wed Apr 3 10:27:20 2024 +0800
[core] Fix that ignore-delete option is not compatible with old delete
records and LocalMergeOperator (#3139)
---
.../paimon/io/KeyValueDataFileRecordReader.java | 22 +++++--
.../paimon/io/KeyValueFileReaderFactory.java | 5 +-
.../paimon/table/AbstractFileStoreTable.java | 5 --
.../org/apache/paimon/table/FileStoreTable.java | 3 -
.../paimon/flink/action/MergeIntoAction.java | 22 -------
.../paimon/flink/sink/LocalMergeOperator.java | 8 ++-
.../apache/paimon/flink/PartialUpdateITCase.java | 74 ++++++++++++++++++----
.../paimon/flink/action/MergeIntoActionITCase.java | 60 +++++++++++++-----
8 files changed, 135 insertions(+), 64 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
index e44ad79ff..517720508 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
@@ -34,12 +34,18 @@ public class KeyValueDataFileRecordReader implements
RecordReader<KeyValue> {
private final RecordReader<InternalRow> reader;
private final KeyValueSerializer serializer;
private final int level;
+ private final boolean ignoreDelete;
public KeyValueDataFileRecordReader(
- RecordReader<InternalRow> reader, RowType keyType, RowType
valueType, int level) {
+ RecordReader<InternalRow> reader,
+ RowType keyType,
+ RowType valueType,
+ int level,
+ boolean ignoreDelete) {
this.reader = reader;
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
+ this.ignoreDelete = ignoreDelete;
}
@Nullable
@@ -50,11 +56,15 @@ public class KeyValueDataFileRecordReader implements
RecordReader<KeyValue> {
return null;
}
- return iterator.transform(
- internalRow ->
- internalRow == null
- ? null
- :
serializer.fromRow(internalRow).setLevel(level));
+ RecordIterator<KeyValue> transformed =
+ iterator.transform(
+ internalRow ->
+ internalRow == null
+ ? null
+ :
serializer.fromRow(internalRow).setLevel(level));
+ // In 0.7- versions, the delete records might be written into data
file even when
+ // ignore-delete configured, so the reader should also filter the
delete records
+ return ignoreDelete ? transformed.filter(KeyValue::isAdd) :
transformed;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 27ddd1ff6..e7d091b47 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -68,6 +68,7 @@ public class KeyValueFileReaderFactory {
private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final BinaryRow partition;
private final DeletionVector.Factory dvFactory;
+ private final boolean ignoreDelete;
private KeyValueFileReaderFactory(
FileIO fileIO,
@@ -91,6 +92,7 @@ public class KeyValueFileReaderFactory {
this.partition = partition;
this.bulkFormatMappings = new HashMap<>();
this.dvFactory = dvFactory;
+ this.ignoreDelete =
CoreOptions.fromMap(schema.options()).ignoreDelete();
}
public RecordReader<KeyValue> createRecordReader(
@@ -144,7 +146,8 @@ public class KeyValueFileReaderFactory {
new ApplyDeletionVectorReader<>(fileRecordReader,
deletionVector.get());
}
- return new KeyValueDataFileRecordReader(fileRecordReader, keyType,
valueType, level);
+ return new KeyValueDataFileRecordReader(
+ fileRecordReader, keyType, valueType, level, ignoreDelete);
}
public static Builder builder(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 5b97b8260..99ad605df 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -189,11 +189,6 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
return copyInternal(dynamicOptions, false);
}
- @Override
- public FileStoreTable internalCopyWithoutCheck(Map<String, String>
dynamicOptions) {
- return copyInternal(dynamicOptions, true);
- }
-
private void checkImmutability(Map<String, String> dynamicOptions) {
Map<String, String> options = tableSchema.options();
// check option is not immutable
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index a96a6138d..f68e2e19d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -86,9 +86,6 @@ public interface FileStoreTable extends DataTable {
/** Doesn't change table schema even when there exists time travel scan
options. */
FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions);
- /** Sometimes we have to change some Immutable options to implement
features. */
- FileStoreTable internalCopyWithoutCheck(Map<String, String>
dynamicOptions);
-
/** TODO: this method is weird, old options will overwrite new options. */
FileStoreTable copyWithLatestSchema();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
index 41f8b7677..4ec5e6ef5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.action;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
@@ -39,7 +38,6 @@ import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -137,8 +135,6 @@ public class MergeIntoAction extends TableActionBase {
table.getClass().getName()));
}
- changeIgnoreMergeEngine();
-
// init primaryKeys of target table
primaryKeys = ((FileStoreTable) table).schema().primaryKeys();
if (primaryKeys.isEmpty()) {
@@ -161,24 +157,6 @@ public class MergeIntoAction extends TableActionBase {
.collect(Collectors.toList());
}
- /**
- * The {@link CoreOptions.MergeEngine}s will process -U/-D records in
different ways, but we
- * want these records to be sunk directly. This method is a workaround
which disables merge
- * engine settings and force compaction.
- */
- private void changeIgnoreMergeEngine() {
- if (CoreOptions.fromMap(table.options()).mergeEngine()
- != CoreOptions.MergeEngine.DEDUPLICATE) {
- Map<String, String> dynamicOptions = new HashMap<>();
- dynamicOptions.put(
- CoreOptions.MERGE_ENGINE.key(),
CoreOptions.MergeEngine.DEDUPLICATE.toString());
- dynamicOptions.put(CoreOptions.IGNORE_DELETE.key(), "false");
- // force compaction
-
dynamicOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
- table = ((FileStoreTable)
table).internalCopyWithoutCheck(dynamicOptions);
- }
- }
-
public MergeIntoAction withTargetAlias(String targetAlias) {
this.targetAlias = targetAlias;
return this;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index 0a5eceb49..a09372443 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -57,7 +57,8 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
private static final long serialVersionUID = 1L;
- TableSchema schema;
+ private final TableSchema schema;
+ private final boolean ignoreDelete;
private transient Projection keyProjection;
private transient RecordComparator keyComparator;
@@ -76,6 +77,7 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
schema.primaryKeys().size() > 0,
"LocalMergeOperator currently only support tables with primary
keys");
this.schema = schema;
+ this.ignoreDelete =
CoreOptions.fromMap(schema.options()).ignoreDelete();
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@@ -137,6 +139,10 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
RowKind rowKind =
rowKindGenerator == null ? row.getRowKind() :
rowKindGenerator.generate(row);
+ if (ignoreDelete && rowKind.isRetract()) {
+ return;
+ }
+
// row kind must be INSERT when it is divided into key and value
row.setRowKind(RowKind.INSERT);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 23a574fb5..5dd6a7322 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -18,6 +18,14 @@
package org.apache.paimon.flink;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.configuration.RestartStrategyOptions;
@@ -29,10 +37,15 @@ import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -409,29 +422,68 @@ public class PartialUpdateITCase extends
CatalogITCaseBase {
insert2.close();
}
- @Test
- public void testIgnoreDelete() throws Exception {
+ @ParameterizedTest(name = "localMergeEnabled = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testIgnoreDelete(boolean localMerge) throws Exception {
sql(
- "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED,
a INT, g INT) WITH ("
+ "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED,
a STRING, b STRING) WITH ("
+ " 'merge-engine' = 'partial-update',"
- + " 'ignore-delete' = 'true',"
- + " 'fields.a.aggregate-function' = 'sum',"
- + " 'fields.g.sequence-group'='a')");
+ + " 'ignore-delete' = 'true'"
+ + ")");
+ if (localMerge) {
+ sql("ALTER TABLE ignore_delete SET ('local-merge-buffer-size' =
'256 kb')");
+ }
String id =
TestValuesTableFactory.registerData(
Arrays.asList(
- Row.ofKind(RowKind.INSERT, 1, 10, 1),
- Row.ofKind(RowKind.DELETE, 1, 10, 2),
- Row.ofKind(RowKind.INSERT, 1, 20, 3)));
+ Row.ofKind(RowKind.INSERT, 1, null, "apple"),
+ Row.ofKind(RowKind.DELETE, 1, null, "apple"),
+ Row.ofKind(RowKind.INSERT, 1, "A", null)));
streamSqlIter(
- "CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT
ENFORCED, a INT, g INT) "
+ "CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT
ENFORCED, a STRING, b STRING) "
+ "WITH ('connector'='values',
'bounded'='true', 'data-id'='%s', "
+ "'changelog-mode' = 'I,D')",
id)
.close();
sEnv.executeSql("INSERT INTO ignore_delete SELECT * FROM
input").await();
- assertThat(sql("SELECT * FROM
ignore_delete")).containsExactlyInAnyOrder(Row.of(1, 30, 3));
+ assertThat(sql("SELECT * FROM ignore_delete"))
+ .containsExactlyInAnyOrder(Row.of(1, "A", "apple"));
+ }
+
+ @Test
+ public void testIgnoreDeleteInReader() throws Exception {
+ sql(
+ "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED,
a STRING, b STRING) WITH ("
+ + " 'merge-engine' = 'deduplicate',"
+ + " 'write-only' = 'true',"
+ + " 'bucket' = '1')");
+ sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING),
'apple')");
+ // write delete records
+ sql("DELETE FROM ignore_delete WHERE pk = 1");
+ sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS
STRING))");
+ assertThat(sql("SELECT * FROM ignore_delete"))
+ .containsExactlyInAnyOrder(Row.of(1, "A", null));
+
+ // force altering merge engine and read
+ Map<String, String> newOptions = new HashMap<>();
+ newOptions.put(
+ CoreOptions.MERGE_ENGINE.key(),
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
+ newOptions.put(CoreOptions.BUCKET.key(), "1");
+ newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true");
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), new Path(path,
"default.db/ignore_delete")),
+ new Schema(
+ Arrays.asList(
+ new DataField(0, "pk",
DataTypes.INT().notNull()),
+ new DataField(1, "a", DataTypes.STRING()),
+ new DataField(2, "b", DataTypes.STRING())),
+ Collections.emptyList(),
+ Collections.singletonList("pk"),
+ newOptions,
+ null));
+ assertThat(sql("SELECT * FROM ignore_delete"))
+ .containsExactlyInAnyOrder(Row.of(1, "A", "apple"));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index 3e27f0c0c..1926299ba 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -130,29 +130,59 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
changelogRow("+I", 8, "v_8", "insert", "02-29"),
changelogRow("+I", 11, "v_11", "insert", "02-29"),
changelogRow("+I", 12, "v_12", "insert", "02-29")));
-
- if (producer == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
- // test partial update still works after action
- testWorkWithPartialUpdate();
- }
}
- private void testWorkWithPartialUpdate() throws Exception {
- insertInto(
- "T",
- "(12, CAST (NULL AS STRING), '$', '02-29')",
- "(12, 'Test', CAST (NULL AS STRING), '02-29')");
+ @Test
+ public void testWorkWithPartialUpdate() throws Exception {
+ // re-create target table with given producer
+ sEnv.executeSql("DROP TABLE T");
+ prepareTargetTable(CoreOptions.ChangelogProducer.LOOKUP);
- testBatchRead(
- buildSimpleQuery("T"),
+ MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse,
database, "T");
+ action.withSourceTable("S")
+ .withMergeCondition("T.k = S.k AND T.dt = S.dt")
+ .withMatchedUpsert(
+ "T.v <> S.v AND S.v IS NOT NULL", "v = S.v,
last_action = 'matched_upsert'")
+ .withMatchedDelete("S.v IS NULL")
+ .withNotMatchedInsert(null, "S.k, S.v, 'insert', S.dt")
+ .withNotMatchedBySourceUpsert(
+ "dt < '02-28'", "v = v || '_nmu', last_action =
'not_matched_upsert'")
+ .withNotMatchedBySourceDelete("dt >= '02-28'");
+
+ // delete records are filtered
+ validateActionRunResult(
+ action.build(),
Arrays.asList(
- changelogRow("+I", 1, "v_1", "creation", "02-27"),
changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert",
"02-27"),
changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert",
"02-27"),
changelogRow("+U", 7, "Seven", "matched_upsert",
"02-28"),
changelogRow("+I", 8, "v_8", "insert", "02-29"),
changelogRow("+I", 11, "v_11", "insert", "02-29"),
- changelogRow("+I", 12, "Test", "$", "02-29")));
+ changelogRow("+I", 12, "v_12", "insert", "02-29")),
+ Arrays.asList(
+ changelogRow("+I", 1, "v_1", "creation", "02-27"),
+ changelogRow("+I", 2, "v_2_nmu", "not_matched_upsert",
"02-27"),
+ changelogRow("+I", 3, "v_3_nmu", "not_matched_upsert",
"02-27"),
+ changelogRow("+I", 4, "v_4", "creation", "02-27"),
+ changelogRow("+I", 5, "v_5", "creation", "02-28"),
+ changelogRow("+I", 6, "v_6", "creation", "02-28"),
+ changelogRow("+I", 7, "Seven", "matched_upsert",
"02-28"),
+ changelogRow("+I", 8, "v_8", "creation", "02-28"),
+ changelogRow("+I", 8, "v_8", "insert", "02-29"),
+ changelogRow("+I", 9, "v_9", "creation", "02-28"),
+ changelogRow("+I", 10, "v_10", "creation", "02-28"),
+ changelogRow("+I", 11, "v_11", "insert", "02-29"),
+ changelogRow("+I", 12, "v_12", "insert", "02-29")));
+
+ // test partial update still works after action
+ insertInto(
+ "T",
+ "(12, CAST (NULL AS STRING), '$', '02-29')",
+ "(12, 'Test', CAST (NULL AS STRING), '02-29')");
+
+ testBatchRead(
+ "SELECT * FROM T WHERE k = 12",
+ Collections.singletonList(changelogRow("+I", 12, "Test", "$",
"02-29")));
}
@ParameterizedTest(name = "in-default = {0}")
@@ -553,7 +583,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
{
put(CHANGELOG_PRODUCER.key(),
producer.toString());
// test works with partial update normally
- if (producer ==
CoreOptions.ChangelogProducer.FULL_COMPACTION) {
+ if (producer ==
CoreOptions.ChangelogProducer.LOOKUP) {
put(
CoreOptions.MERGE_ENGINE.key(),
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());