This is an automated email from the ASF dual-hosted git repository.

liming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e37b49ca8b [flink] supports the log system to use the insert-only 
format. (#5062)
e37b49ca8b is described below

commit e37b49ca8b84e49534449f05b22702dcaa3e5da0
Author: liming.1018 <[email protected]>
AuthorDate: Wed Feb 12 20:59:06 2025 +0800

    [flink] supports the log system to use the insert-only format. (#5062)
---
 .../main/java/org/apache/paimon/CoreOptions.java   |  7 +++
 .../paimon/flink/kafka/KafkaLogStoreFactory.java   |  4 +-
 .../flink/kafka/KafkaLogSerializationTest.java     | 66 +++++++++++++++++++++-
 .../paimon/flink/kafka/KafkaLogTestUtils.java      | 21 ++++++-
 .../paimon/flink/log/LogStoreTableFactory.java     | 21 +++++--
 .../flink/sink/RowDataStoreWriteOperator.java      |  9 ++-
 .../paimon/flink/source/BaseDataTableSource.java   |  5 ++
 7 files changed, 119 insertions(+), 14 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index abc1616087..ab84aed36f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -805,6 +805,13 @@ public class CoreOptions implements Serializable {
                     .defaultValue("debezium-json")
                     .withDescription("Specify the message format of log 
system.");
 
+    @ExcludeFromDocumentation("Confused without log system")
+    public static final ConfigOption<Boolean> LOG_IGNORE_DELETE =
+            key("log.ignore-delete")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Specify whether the log system ignores 
delete records.");
+
     public static final ConfigOption<Boolean> AUTO_CREATE =
             key("auto-create")
                     .booleanType()
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
index 8a9ec42a25..f382a8cf18 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
@@ -87,7 +87,7 @@ public class KafkaLogStoreFactory implements 
LogStoreTableFactory {
                             .createRuntimeDecoder(sourceContext, keyType);
         }
         DeserializationSchema<RowData> valueDeserializer =
-                LogStoreTableFactory.getValueDecodingFormat(helper)
+                LogStoreTableFactory.getValueDecodingFormat(helper, 
primaryKey.length != 0)
                         .createRuntimeDecoder(sourceContext, physicalType);
         Options options = toOptions(helper.getOptions());
         Long timestampMills = options.get(SCAN_TIMESTAMP_MILLIS);
@@ -127,7 +127,7 @@ public class KafkaLogStoreFactory implements 
LogStoreTableFactory {
                             .createRuntimeEncoder(sinkContext, keyType);
         }
         SerializationSchema<RowData> valueSerializer =
-                LogStoreTableFactory.getValueEncodingFormat(helper)
+                LogStoreTableFactory.getValueEncodingFormat(helper, 
primaryKey.length != 0)
                         .createRuntimeEncoder(sinkContext, physicalType);
         Options options = toOptions(helper.getOptions());
         return new KafkaLogSinkProvider(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
index 9d8e2295a8..28a746c307 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.table.sink.SinkRecord;
 import org.apache.paimon.types.RowKind;
 
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.util.Collector;
@@ -30,8 +31,13 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.paimon.CoreOptions.LOG_FORMAT;
+import static org.apache.paimon.CoreOptions.LOG_IGNORE_DELETE;
 import static org.apache.paimon.flink.FlinkRowData.toFlinkRowKind;
 import static 
org.apache.paimon.flink.kafka.KafkaLogTestUtils.discoverKafkaLogFactory;
 import static org.apache.paimon.flink.kafka.KafkaLogTestUtils.testContext;
@@ -69,6 +75,50 @@ public class KafkaLogSerializationTest {
         checkNonKeyed(LogChangelogMode.ALL, -1, 5, 3);
     }
 
+    @Test
+    public void testNonKeyedWithInsertOnlyFormat() throws Exception {
+        check(
+                LogChangelogMode.AUTO,
+                false,
+                -1,
+                3,
+                5,
+                RowKind.INSERT,
+                Collections.singletonMap(LOG_FORMAT.key(), "json"));
+        check(
+                LogChangelogMode.AUTO,
+                false,
+                -1,
+                3,
+                5,
+                RowKind.UPDATE_AFTER,
+                Collections.singletonMap(LOG_FORMAT.key(), "json"));
+    }
+
+    @Test
+    public void testKeyedWithInsertOnlyFormat() throws Exception {
+        Map<String, String> dynamicOptions = new HashMap<>();
+        dynamicOptions.put(LOG_FORMAT.key(), "json");
+
+        assertThatThrownBy(
+                        () ->
+                                check(
+                                        LogChangelogMode.AUTO,
+                                        true,
+                                        -1,
+                                        3,
+                                        5,
+                                        RowKind.INSERT,
+                                        dynamicOptions))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "A value format should deal with all records. But json 
has a changelog mode of [INSERT]");
+
+        dynamicOptions.put(LOG_IGNORE_DELETE.key(), "true");
+        check(LogChangelogMode.AUTO, true, -1, 3, 5, RowKind.INSERT, 
dynamicOptions);
+        check(LogChangelogMode.AUTO, true, -1, 3, 5, RowKind.UPDATE_AFTER, 
dynamicOptions);
+    }
+
     private void checkKeyed(LogChangelogMode mode, int bucket, int key, int 
value)
             throws Exception {
         check(mode, true, bucket, key, value, RowKind.INSERT);
@@ -88,11 +138,23 @@ public class KafkaLogSerializationTest {
     private void check(
             LogChangelogMode mode, boolean keyed, int bucket, int key, int 
value, RowKind rowKind)
             throws Exception {
+        check(mode, keyed, bucket, key, value, rowKind, 
Collections.emptyMap());
+    }
+
+    private void check(
+            LogChangelogMode mode,
+            boolean keyed,
+            int bucket,
+            int key,
+            int value,
+            RowKind rowKind,
+            Map<String, String> dynamicOptions)
+            throws Exception {
         KafkaLogSerializationSchema serializer =
-                createTestSerializationSchema(testContext("", mode, keyed));
+                createTestSerializationSchema(testContext("", mode, keyed, 
dynamicOptions));
         serializer.open(null);
         KafkaRecordDeserializationSchema<RowData> deserializer =
-                createTestDeserializationSchema(testContext("", mode, keyed));
+                createTestDeserializationSchema(testContext("", mode, keyed, 
dynamicOptions));
         deserializer.open(null);
 
         SinkRecord input = testRecord(keyed, bucket, key, value, rowKind);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
index 375cbb5c43..ef57941c52 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
@@ -175,7 +175,21 @@ public class KafkaLogTestUtils {
 
     public static DynamicTableFactory.Context testContext(
             String servers, LogChangelogMode changelogMode, boolean keyed) {
-        return testContext("table", servers, changelogMode, 
LogConsistency.TRANSACTIONAL, keyed);
+        return testContext(servers, changelogMode, keyed, 
Collections.emptyMap());
+    }
+
+    public static DynamicTableFactory.Context testContext(
+            String servers,
+            LogChangelogMode changelogMode,
+            boolean keyed,
+            Map<String, String> dynamicOptions) {
+        return testContext(
+                "table",
+                servers,
+                changelogMode,
+                LogConsistency.TRANSACTIONAL,
+                keyed,
+                dynamicOptions);
     }
 
     static DynamicTableFactory.Context testContext(
@@ -183,7 +197,8 @@ public class KafkaLogTestUtils {
             String servers,
             LogChangelogMode changelogMode,
             LogConsistency consistency,
-            boolean keyed) {
+            boolean keyed,
+            Map<String, String> dynamicOptions) {
         return testContext(
                 name,
                 servers,
@@ -191,7 +206,7 @@ public class KafkaLogTestUtils {
                 consistency,
                 RowType.of(new IntType(), new IntType()),
                 keyed ? new int[] {0} : new int[0],
-                new HashMap<>());
+                dynamicOptions);
     }
 
     public static DynamicTableFactory.Context testContext(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
index 324b6de2ff..4b15dc8369 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
@@ -45,6 +45,7 @@ import org.apache.flink.types.RowKind;
 import javax.annotation.Nullable;
 
 import static org.apache.paimon.CoreOptions.LOG_FORMAT;
+import static org.apache.paimon.CoreOptions.LOG_IGNORE_DELETE;
 import static org.apache.paimon.CoreOptions.LOG_KEY_FORMAT;
 
 /**
@@ -100,6 +101,12 @@ public interface LogStoreTableFactory extends Factory {
                 .defaultValue(LOG_FORMAT.defaultValue());
     }
 
+    static ConfigOption<Boolean> logIgnoreDelete() {
+        return ConfigOptions.key(LOG_IGNORE_DELETE.key())
+                .booleanType()
+                .defaultValue(LOG_IGNORE_DELETE.defaultValue());
+    }
+
     static LogStoreTableFactory discoverLogStoreFactory(ClassLoader cl, String 
identifier) {
         return FactoryUtil.discoverFactory(cl, LogStoreTableFactory.class, 
identifier);
     }
@@ -121,18 +128,20 @@ public interface LogStoreTableFactory extends Factory {
     }
 
     static DecodingFormat<DeserializationSchema<RowData>> 
getValueDecodingFormat(
-            FlinkTableFactoryHelper helper) {
+            FlinkTableFactoryHelper helper, boolean hasPrimaryKey) {
         DecodingFormat<DeserializationSchema<RowData>> format =
                 
helper.discoverDecodingFormat(DeserializationFormatFactory.class, logFormat());
-        validateValueFormat(format, helper.getOptions().get(logFormat()));
+        boolean insertOnly = !hasPrimaryKey || 
helper.getOptions().get(logIgnoreDelete());
+        validateValueFormat(format, helper.getOptions().get(logFormat()), 
insertOnly);
         return format;
     }
 
     static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
-            FlinkTableFactoryHelper helper) {
+            FlinkTableFactoryHelper helper, boolean hasPrimaryKey) {
         EncodingFormat<SerializationSchema<RowData>> format =
                 
helper.discoverEncodingFormat(SerializationFormatFactory.class, logFormat());
-        validateValueFormat(format, helper.getOptions().get(logFormat()));
+        boolean insertOnly = !hasPrimaryKey || 
helper.getOptions().get(logIgnoreDelete());
+        validateValueFormat(format, helper.getOptions().get(logFormat()), 
insertOnly);
         return format;
     }
 
@@ -146,8 +155,8 @@ public interface LogStoreTableFactory extends Factory {
         }
     }
 
-    static void validateValueFormat(Format format, String name) {
-        if (!format.getChangelogMode().equals(ChangelogMode.all())) {
+    static void validateValueFormat(Format format, String name, boolean 
insertOnly) {
+        if (!insertOnly && 
!format.getChangelogMode().equals(ChangelogMode.all())) {
             throw new ValidationException(
                     String.format(
                             "A value format should deal with all records. "
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index 8009bec967..bd849f9d3e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.log.LogWriteCallback;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.SinkRecord;
 
@@ -49,6 +50,8 @@ import java.lang.reflect.Method;
 import java.util.List;
 import java.util.Objects;
 
+import static org.apache.paimon.CoreOptions.LOG_IGNORE_DELETE;
+
 /** A {@link PrepareCommitOperator} to write {@link InternalRow}. Record 
schema is fixed. */
 public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow> 
{
 
@@ -57,6 +60,7 @@ public class RowDataStoreWriteOperator extends 
TableWriteOperator<InternalRow> {
     @Nullable private final LogSinkFunction logSinkFunction;
     private transient SimpleContext sinkContext;
     @Nullable private transient LogWriteCallback logCallback;
+    private transient boolean logIgnoreDelete;
 
     /** We listen to this ourselves because we don't have an {@link 
InternalTimerService}. */
     private long currentWatermark = Long.MIN_VALUE;
@@ -97,6 +101,7 @@ public class RowDataStoreWriteOperator extends 
TableWriteOperator<InternalRow> {
             openFunction(logSinkFunction);
             logCallback = new LogWriteCallback();
             logSinkFunction.setWriteCallback(logCallback);
+            logIgnoreDelete = 
Options.fromMap(table.options()).get(LOG_IGNORE_DELETE);
         }
     }
 
@@ -139,7 +144,9 @@ public class RowDataStoreWriteOperator extends 
TableWriteOperator<InternalRow> {
             throw new IOException(e);
         }
 
-        if (record != null && logSinkFunction != null) {
+        if (record != null
+                && logSinkFunction != null
+                && (!logIgnoreDelete || record.row().getRowKind().isAdd())) {
             // write to log store, need to preserve original pk (which 
includes partition fields)
             SinkRecord logRecord = write.toLogRecord(record);
             logSinkFunction.invoke(logRecord, sinkContext);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 836c1372f7..aaad71977d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -65,6 +65,7 @@ import java.util.stream.IntStream;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.paimon.CoreOptions.LOG_IGNORE_DELETE;
 import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
 import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC_THREAD_NUMBER;
@@ -149,6 +150,10 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
                 return ChangelogMode.all();
             }
 
+            if (logStoreTableFactory != null && 
options.get(LOG_IGNORE_DELETE)) {
+                return ChangelogMode.insertOnly();
+            }
+
             // optimization: transaction consistency and all changelog mode 
avoid the generation of
             // normalized nodes. See FlinkTableSink.getChangelogMode 
validation.
             return options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL

Reply via email to