This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5add480d8 [core] Fix that ignore-delete option is not compatible with 
old delete records and LocalMergeOperator (#3139)
5add480d8 is described below

commit 5add480d8ed6630e2cf7923fd26cc6a5aa9db47f
Author: yuzelin <[email protected]>
AuthorDate: Wed Apr 3 10:27:20 2024 +0800

    [core] Fix that ignore-delete option is not compatible with old delete 
records and LocalMergeOperator (#3139)
---
 .../paimon/io/KeyValueDataFileRecordReader.java    | 22 +++++--
 .../paimon/io/KeyValueFileReaderFactory.java       |  5 +-
 .../paimon/table/AbstractFileStoreTable.java       |  5 --
 .../org/apache/paimon/table/FileStoreTable.java    |  3 -
 .../paimon/flink/action/MergeIntoAction.java       | 22 -------
 .../paimon/flink/sink/LocalMergeOperator.java      |  8 ++-
 .../apache/paimon/flink/PartialUpdateITCase.java   | 74 ++++++++++++++++++----
 .../paimon/flink/action/MergeIntoActionITCase.java | 60 +++++++++++++-----
 8 files changed, 135 insertions(+), 64 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
index e44ad79ff..517720508 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
@@ -34,12 +34,18 @@ public class KeyValueDataFileRecordReader implements 
RecordReader<KeyValue> {
     private final RecordReader<InternalRow> reader;
     private final KeyValueSerializer serializer;
     private final int level;
+    private final boolean ignoreDelete;
 
     public KeyValueDataFileRecordReader(
-            RecordReader<InternalRow> reader, RowType keyType, RowType 
valueType, int level) {
+            RecordReader<InternalRow> reader,
+            RowType keyType,
+            RowType valueType,
+            int level,
+            boolean ignoreDelete) {
         this.reader = reader;
         this.serializer = new KeyValueSerializer(keyType, valueType);
         this.level = level;
+        this.ignoreDelete = ignoreDelete;
     }
 
     @Nullable
@@ -50,11 +56,15 @@ public class KeyValueDataFileRecordReader implements 
RecordReader<KeyValue> {
             return null;
         }
 
-        return iterator.transform(
-                internalRow ->
-                        internalRow == null
-                                ? null
-                                : 
serializer.fromRow(internalRow).setLevel(level));
+        RecordIterator<KeyValue> transformed =
+                iterator.transform(
+                        internalRow ->
+                                internalRow == null
+                                        ? null
+                                        : 
serializer.fromRow(internalRow).setLevel(level));
+        // In 0.7- versions, the delete records might be written into data 
file even when
+        // ignore-delete configured, so the reader should also filter the 
delete records
+        return ignoreDelete ? transformed.filter(KeyValue::isAdd) : 
transformed;
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 27ddd1ff6..e7d091b47 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -68,6 +68,7 @@ public class KeyValueFileReaderFactory {
     private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
     private final BinaryRow partition;
     private final DeletionVector.Factory dvFactory;
+    private final boolean ignoreDelete;
 
     private KeyValueFileReaderFactory(
             FileIO fileIO,
@@ -91,6 +92,7 @@ public class KeyValueFileReaderFactory {
         this.partition = partition;
         this.bulkFormatMappings = new HashMap<>();
         this.dvFactory = dvFactory;
+        this.ignoreDelete = 
CoreOptions.fromMap(schema.options()).ignoreDelete();
     }
 
     public RecordReader<KeyValue> createRecordReader(
@@ -144,7 +146,8 @@ public class KeyValueFileReaderFactory {
                     new ApplyDeletionVectorReader<>(fileRecordReader, 
deletionVector.get());
         }
 
-        return new KeyValueDataFileRecordReader(fileRecordReader, keyType, 
valueType, level);
+        return new KeyValueDataFileRecordReader(
+                fileRecordReader, keyType, valueType, level, ignoreDelete);
     }
 
     public static Builder builder(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 5b97b8260..99ad605df 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -189,11 +189,6 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         return copyInternal(dynamicOptions, false);
     }
 
-    @Override
-    public FileStoreTable internalCopyWithoutCheck(Map<String, String> 
dynamicOptions) {
-        return copyInternal(dynamicOptions, true);
-    }
-
     private void checkImmutability(Map<String, String> dynamicOptions) {
         Map<String, String> options = tableSchema.options();
         // check option is not immutable
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index a96a6138d..f68e2e19d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -86,9 +86,6 @@ public interface FileStoreTable extends DataTable {
     /** Doesn't change table schema even when there exists time travel scan 
options. */
     FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions);
 
-    /** Sometimes we have to change some Immutable options to implement 
features. */
-    FileStoreTable internalCopyWithoutCheck(Map<String, String> 
dynamicOptions);
-
     /** TODO: this method is weird, old options will overwrite new options. */
     FileStoreTable copyWithLatestSchema();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
index 41f8b7677..4ec5e6ef5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.action;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataField;
@@ -39,7 +38,6 @@ import javax.annotation.Nullable;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -137,8 +135,6 @@ public class MergeIntoAction extends TableActionBase {
                             table.getClass().getName()));
         }
 
-        changeIgnoreMergeEngine();
-
         // init primaryKeys of target table
         primaryKeys = ((FileStoreTable) table).schema().primaryKeys();
         if (primaryKeys.isEmpty()) {
@@ -161,24 +157,6 @@ public class MergeIntoAction extends TableActionBase {
                         .collect(Collectors.toList());
     }
 
-    /**
-     * The {@link CoreOptions.MergeEngine}s will process -U/-D records in 
different ways, but we
-     * want these records to be sunk directly. This method is a workaround 
which disables merge
-     * engine settings and force compaction.
-     */
-    private void changeIgnoreMergeEngine() {
-        if (CoreOptions.fromMap(table.options()).mergeEngine()
-                != CoreOptions.MergeEngine.DEDUPLICATE) {
-            Map<String, String> dynamicOptions = new HashMap<>();
-            dynamicOptions.put(
-                    CoreOptions.MERGE_ENGINE.key(), 
CoreOptions.MergeEngine.DEDUPLICATE.toString());
-            dynamicOptions.put(CoreOptions.IGNORE_DELETE.key(), "false");
-            // force compaction
-            
dynamicOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
-            table = ((FileStoreTable) 
table).internalCopyWithoutCheck(dynamicOptions);
-        }
-    }
-
     public MergeIntoAction withTargetAlias(String targetAlias) {
         this.targetAlias = targetAlias;
         return this;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index 0a5eceb49..a09372443 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -57,7 +57,8 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
 
     private static final long serialVersionUID = 1L;
 
-    TableSchema schema;
+    private final TableSchema schema;
+    private final boolean ignoreDelete;
 
     private transient Projection keyProjection;
     private transient RecordComparator keyComparator;
@@ -76,6 +77,7 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
                 schema.primaryKeys().size() > 0,
                 "LocalMergeOperator currently only support tables with primary 
keys");
         this.schema = schema;
+        this.ignoreDelete = 
CoreOptions.fromMap(schema.options()).ignoreDelete();
         setChainingStrategy(ChainingStrategy.ALWAYS);
     }
 
@@ -137,6 +139,10 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
 
         RowKind rowKind =
                 rowKindGenerator == null ? row.getRowKind() : 
rowKindGenerator.generate(row);
+        if (ignoreDelete && rowKind.isRetract()) {
+            return;
+        }
+
         // row kind must be INSERT when it is divided into key and value
         row.setRowKind(RowKind.INSERT);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 23a574fb5..5dd6a7322 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -18,6 +18,14 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.configuration.RestartStrategyOptions;
@@ -29,10 +37,15 @@ import org.apache.flink.util.CloseableIterator;
 import org.assertj.core.api.Assertions;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
@@ -409,29 +422,68 @@ public class PartialUpdateITCase extends 
CatalogITCaseBase {
         insert2.close();
     }
 
-    @Test
-    public void testIgnoreDelete() throws Exception {
+    @ParameterizedTest(name = "localMergeEnabled = {0}")
+    @ValueSource(booleans = {true, false})
+    public void testIgnoreDelete(boolean localMerge) throws Exception {
         sql(
-                "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, 
a INT, g INT) WITH ("
+                "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, 
a STRING, b STRING) WITH ("
                         + " 'merge-engine' = 'partial-update',"
-                        + " 'ignore-delete' = 'true',"
-                        + " 'fields.a.aggregate-function' = 'sum',"
-                        + " 'fields.g.sequence-group'='a')");
+                        + " 'ignore-delete' = 'true'"
+                        + ")");
+        if (localMerge) {
+            sql("ALTER TABLE ignore_delete SET ('local-merge-buffer-size' = 
'256 kb')");
+        }
 
         String id =
                 TestValuesTableFactory.registerData(
                         Arrays.asList(
-                                Row.ofKind(RowKind.INSERT, 1, 10, 1),
-                                Row.ofKind(RowKind.DELETE, 1, 10, 2),
-                                Row.ofKind(RowKind.INSERT, 1, 20, 3)));
+                                Row.ofKind(RowKind.INSERT, 1, null, "apple"),
+                                Row.ofKind(RowKind.DELETE, 1, null, "apple"),
+                                Row.ofKind(RowKind.INSERT, 1, "A", null)));
         streamSqlIter(
-                        "CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT 
ENFORCED, a INT, g INT) "
+                        "CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT 
ENFORCED, a STRING, b STRING) "
                                 + "WITH ('connector'='values', 
'bounded'='true', 'data-id'='%s', "
                                 + "'changelog-mode' = 'I,D')",
                         id)
                 .close();
         sEnv.executeSql("INSERT INTO ignore_delete SELECT * FROM 
input").await();
 
-        assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactlyInAnyOrder(Row.of(1, 30, 3));
+        assertThat(sql("SELECT * FROM ignore_delete"))
+                .containsExactlyInAnyOrder(Row.of(1, "A", "apple"));
+    }
+
+    @Test
+    public void testIgnoreDeleteInReader() throws Exception {
+        sql(
+                "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, 
a STRING, b STRING) WITH ("
+                        + " 'merge-engine' = 'deduplicate',"
+                        + " 'write-only' = 'true',"
+                        + " 'bucket' = '1')");
+        sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING), 
'apple')");
+        // write delete records
+        sql("DELETE FROM ignore_delete WHERE pk = 1");
+        sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS 
STRING))");
+        assertThat(sql("SELECT * FROM ignore_delete"))
+                .containsExactlyInAnyOrder(Row.of(1, "A", null));
+
+        // force altering merge engine and read
+        Map<String, String> newOptions = new HashMap<>();
+        newOptions.put(
+                CoreOptions.MERGE_ENGINE.key(), 
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
+        newOptions.put(CoreOptions.BUCKET.key(), "1");
+        newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true");
+        SchemaUtils.forceCommit(
+                new SchemaManager(LocalFileIO.create(), new Path(path, 
"default.db/ignore_delete")),
+                new Schema(
+                        Arrays.asList(
+                                new DataField(0, "pk", 
DataTypes.INT().notNull()),
+                                new DataField(1, "a", DataTypes.STRING()),
+                                new DataField(2, "b", DataTypes.STRING())),
+                        Collections.emptyList(),
+                        Collections.singletonList("pk"),
+                        newOptions,
+                        null));
+        assertThat(sql("SELECT * FROM ignore_delete"))
+                .containsExactlyInAnyOrder(Row.of(1, "A", "apple"));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index 3e27f0c0c..1926299ba 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -130,29 +130,59 @@ public class MergeIntoActionITCase extends 
ActionITCaseBase {
                         changelogRow("+I", 8, "v_8", "insert", "02-29"),
                         changelogRow("+I", 11, "v_11", "insert", "02-29"),
                         changelogRow("+I", 12, "v_12", "insert", "02-29")));
-
-        if (producer == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
-            // test partial update still works after action
-            testWorkWithPartialUpdate();
-        }
     }
 
-    private void testWorkWithPartialUpdate() throws Exception {
-        insertInto(
-                "T",
-                "(12, CAST (NULL AS STRING), '$', '02-29')",
-                "(12, 'Test', CAST (NULL AS STRING), '02-29')");
+    @Test
+    public void testWorkWithPartialUpdate() throws Exception {
+        // re-create target table with given producer
+        sEnv.executeSql("DROP TABLE T");
+        prepareTargetTable(CoreOptions.ChangelogProducer.LOOKUP);
 
-        testBatchRead(
-                buildSimpleQuery("T"),
+        MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, 
database, "T");
+        action.withSourceTable("S")
+                .withMergeCondition("T.k = S.k AND T.dt = S.dt")
+                .withMatchedUpsert(
+                        "T.v <> S.v AND S.v IS NOT NULL", "v = S.v, 
last_action = 'matched_upsert'")
+                .withMatchedDelete("S.v IS NULL")
+                .withNotMatchedInsert(null, "S.k, S.v, 'insert', S.dt")
+                .withNotMatchedBySourceUpsert(
+                        "dt < '02-28'", "v = v || '_nmu', last_action = 
'not_matched_upsert'")
+                .withNotMatchedBySourceDelete("dt >= '02-28'");
+
+        // delete records are filtered
+        validateActionRunResult(
+                action.build(),
                 Arrays.asList(
-                        changelogRow("+I", 1, "v_1", "creation", "02-27"),
                         changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", 
"02-27"),
                         changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", 
"02-27"),
                         changelogRow("+U", 7, "Seven", "matched_upsert", 
"02-28"),
                         changelogRow("+I", 8, "v_8", "insert", "02-29"),
                         changelogRow("+I", 11, "v_11", "insert", "02-29"),
-                        changelogRow("+I", 12, "Test", "$", "02-29")));
+                        changelogRow("+I", 12, "v_12", "insert", "02-29")),
+                Arrays.asList(
+                        changelogRow("+I", 1, "v_1", "creation", "02-27"),
+                        changelogRow("+I", 2, "v_2_nmu", "not_matched_upsert", 
"02-27"),
+                        changelogRow("+I", 3, "v_3_nmu", "not_matched_upsert", 
"02-27"),
+                        changelogRow("+I", 4, "v_4", "creation", "02-27"),
+                        changelogRow("+I", 5, "v_5", "creation", "02-28"),
+                        changelogRow("+I", 6, "v_6", "creation", "02-28"),
+                        changelogRow("+I", 7, "Seven", "matched_upsert", 
"02-28"),
+                        changelogRow("+I", 8, "v_8", "creation", "02-28"),
+                        changelogRow("+I", 8, "v_8", "insert", "02-29"),
+                        changelogRow("+I", 9, "v_9", "creation", "02-28"),
+                        changelogRow("+I", 10, "v_10", "creation", "02-28"),
+                        changelogRow("+I", 11, "v_11", "insert", "02-29"),
+                        changelogRow("+I", 12, "v_12", "insert", "02-29")));
+
+        // test partial update still works after action
+        insertInto(
+                "T",
+                "(12, CAST (NULL AS STRING), '$', '02-29')",
+                "(12, 'Test', CAST (NULL AS STRING), '02-29')");
+
+        testBatchRead(
+                "SELECT * FROM T WHERE k = 12",
+                Collections.singletonList(changelogRow("+I", 12, "Test", "$", 
"02-29")));
     }
 
     @ParameterizedTest(name = "in-default = {0}")
@@ -553,7 +583,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
                             {
                                 put(CHANGELOG_PRODUCER.key(), 
producer.toString());
                                 // test works with partial update normally
-                                if (producer == 
CoreOptions.ChangelogProducer.FULL_COMPACTION) {
+                                if (producer == 
CoreOptions.ChangelogProducer.LOOKUP) {
                                     put(
                                             CoreOptions.MERGE_ENGINE.key(),
                                             
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());

Reply via email to