This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fb9babbb8 [flink][bug] MergeIntoAction and DeleteAction work
abnormally with partial update merge engine (#829)
fb9babbb8 is described below
commit fb9babbb8676d598e817704db4993604b0d3a9d6
Author: yuzelin <[email protected]>
AuthorDate: Fri Apr 7 17:18:13 2023 +0800
[flink][bug] MergeIntoAction and DeleteAction work abnormally with partial
update merge engine (#829)
---
.../paimon/table/AbstractFileStoreTable.java | 9 ++-
.../org/apache/paimon/table/FileStoreTable.java | 3 +
.../org/apache/paimon/flink/action/ActionBase.java | 23 ++++++++
.../apache/paimon/flink/action/DeleteAction.java | 1 +
.../paimon/flink/action/MergeIntoAction.java | 2 +
.../paimon/flink/action/DeleteActionITCase.java | 64 ++++++++++++++++++++++
.../paimon/flink/action/MergeIntoActionITCase.java | 30 ++++++++++
7 files changed, 131 insertions(+), 1 deletion(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index e0427a23b..2810bf3ba 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -108,8 +108,8 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public FileStoreTable copy(Map<String, String> dynamicOptions) {
+ Map<String, String> options = tableSchema.options();
// check option is not immutable
- Map<String, String> options = new HashMap<>(tableSchema.options());
dynamicOptions.forEach(
(k, v) -> {
if (!Objects.equals(v, options.get(k))) {
@@ -117,6 +117,13 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
}
});
+ return internalCopyWithoutCheck(dynamicOptions);
+ }
+
+ @Override
+ public FileStoreTable internalCopyWithoutCheck(Map<String, String>
dynamicOptions) {
+ Map<String, String> options = new HashMap<>(tableSchema.options());
+
// merge non-null dynamic options into schema.options
dynamicOptions.forEach(
(k, v) -> {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index a864f3e4d..71902147b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -72,6 +72,9 @@ public interface FileStoreTable extends DataTable {
@Override
FileStoreTable copy(Map<String, String> dynamicOptions);
+ /** Sometimes we have to change some Immutable options to implement
features. */
+ FileStoreTable internalCopyWithoutCheck(Map<String, String>
dynamicOptions);
+
FileStoreTable copyWithLatestSchema();
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index a077f7837..528b68f25 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
@@ -33,6 +34,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeCasts;
+import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -45,7 +47,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -147,4 +151,23 @@ public abstract class ActionBase implements Action {
TableEnvironmentUtils.executeInternal(tEnv, transformations,
sinkIdentifierNames);
}
+
+ /**
+ * The {@link CoreOptions.MergeEngine}s will process -U/-D records in
different ways, but we
+ * want these records to be sunk directly. This method is a workaround.
Actions that may produce
+ * -U/-D records can call this to disable merge engine settings and force
compaction.
+ */
+ protected void changeIgnoreMergeEngine() {
+ if (CoreOptions.fromMap(table.options()).mergeEngine()
+ != CoreOptions.MergeEngine.DEDUPLICATE) {
+ Map<String, String> dynamicOptions = new HashMap<>();
+ dynamicOptions.put(
+ CoreOptions.MERGE_ENGINE.key(),
CoreOptions.MergeEngine.DEDUPLICATE.toString());
+ // force compaction
+
dynamicOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
+ Preconditions.checkArgument(
+ table instanceof FileStoreTable, "Only supports
FileStoreTable.");
+ table = ((FileStoreTable)
table).internalCopyWithoutCheck(dynamicOptions);
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
index 1c67cf74a..5f1b54ffb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
@@ -45,6 +45,7 @@ public class DeleteAction extends ActionBase {
public DeleteAction(String warehouse, String databaseName, String
tableName, String filter) {
super(warehouse, databaseName, tableName);
+ changeIgnoreMergeEngine();
this.filter = filter;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
index 859104c7f..5d89c18eb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
@@ -131,6 +131,8 @@ public class MergeIntoAction extends ActionBase {
table.getClass().getName()));
}
+ changeIgnoreMergeEngine();
+
// init primaryKeys of target table
primaryKeys = ((FileStoreTable) table).schema().primaryKeys();
if (primaryKeys.isEmpty()) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
index 67b663fd7..57617de66 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.table.FileStoreTable;
@@ -28,6 +29,7 @@ import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@@ -36,10 +38,13 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertInto;
+import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testStreamingRead;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.validateStreamingReadResult;
import static org.assertj.core.api.Assertions.assertThat;
@@ -78,6 +83,65 @@ public class DeleteActionITCase extends ActionITCaseBase {
iterator.close();
}
+ @Test
+ public void testWorkWithPartialUpdateTable() throws Exception {
+ createFileStoreTable(
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.STRING(),
DataTypes.STRING()},
+ new String[] {"k", "a", "b"}),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ new HashMap<String, String>() {
+ {
+ put(
+ CoreOptions.MERGE_ENGINE.key(),
+
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
+ put(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE.key(),
"true");
+ put(
+ CoreOptions.CHANGELOG_PRODUCER.key(),
+ ThreadLocalRandom.current().nextBoolean()
+ ?
CoreOptions.ChangelogProducer.LOOKUP.toString()
+ :
CoreOptions.ChangelogProducer.FULL_COMPACTION.toString());
+ }
+ });
+
+ DeleteAction action = new DeleteAction(warehouse, database, tableName,
"k < 3");
+
+ insertInto(
+ tableName, "(1, 'Say', 'A'), (2, 'Hi', 'B'), (3, 'To', 'C'),
(4, 'Paimon', 'D')");
+
+ BlockingIterator<Row, Row> streamItr =
+ testStreamingRead(
+ buildSimpleQuery(tableName),
+ Arrays.asList(
+ changelogRow("+I", 1, "Say", "A"),
+ changelogRow("+I", 2, "Hi", "B"),
+ changelogRow("+I", 3, "To", "C"),
+ changelogRow("+I", 4, "Paimon", "D")));
+
+ action.run();
+
+ // test delete records hasn't been thrown
+ validateStreamingReadResult(
+ streamItr,
+ Arrays.asList(changelogRow("-D", 1, "Say", "A"),
changelogRow("-D", 2, "Hi", "B")));
+
+ // test partial update still works after action
+ insertInto(
+ tableName, "(4, CAST (NULL AS STRING), '$')", "(4, 'Test',
CAST (NULL AS STRING))");
+
+ validateStreamingReadResult(
+ streamItr,
+ Arrays.asList(
+ changelogRow("-U", 4, "Paimon", "D"),
changelogRow("+U", 4, "Test", "$")));
+ streamItr.close();
+
+ testBatchRead(
+ buildSimpleQuery(tableName),
+ Arrays.asList(
+ changelogRow("+I", 3, "To", "C"), changelogRow("+I",
4, "Test", "$")));
+ }
+
private void prepareTable(boolean hasPk) throws Exception {
FileStoreTable table =
createFileStoreTable(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index 44e00580e..d562f5160 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -121,6 +121,29 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
changelogRow("+I", 8, "v_8", "insert", "02-29"),
changelogRow("+I", 11, "v_11", "insert", "02-29"),
changelogRow("+I", 12, "v_12", "insert", "02-29")));
+
+ if (producer == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
+ // test partial update still works after action
+ testWorkWithPartialUpdate();
+ }
+ }
+
+ private void testWorkWithPartialUpdate() throws Exception {
+ insertInto(
+ "T",
+ "(12, CAST (NULL AS STRING), '$', '02-29')",
+ "(12, 'Test', CAST (NULL AS STRING), '02-29')");
+
+ testBatchRead(
+ buildSimpleQuery("T"),
+ Arrays.asList(
+ changelogRow("+I", 1, "v_1", "creation", "02-27"),
+ changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert",
"02-27"),
+ changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert",
"02-27"),
+ changelogRow("+U", 7, "Seven", "matched_upsert",
"02-28"),
+ changelogRow("+I", 8, "v_8", "insert", "02-29"),
+ changelogRow("+I", 11, "v_11", "insert", "02-29"),
+ changelogRow("+I", 12, "Test", "$", "02-29")));
}
@ParameterizedTest(name = "in-default = {0}")
@@ -415,6 +438,13 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
new HashMap<String, String>() {
{
put(CHANGELOG_PRODUCER.key(),
producer.toString());
+ // test works with partial update normally
+ if (producer ==
CoreOptions.ChangelogProducer.FULL_COMPACTION) {
+ put(
+ CoreOptions.MERGE_ENGINE.key(),
+
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
+
put(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE.key(), "true");
+ }
}
}));