This is an automated email from the ASF dual-hosted git repository.
liming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e37b49ca8b [flink] supports the log system to use the insert-only
format. (#5062)
e37b49ca8b is described below
commit e37b49ca8b84e49534449f05b22702dcaa3e5da0
Author: liming.1018 <[email protected]>
AuthorDate: Wed Feb 12 20:59:06 2025 +0800
[flink] supports the log system to use the insert-only format. (#5062)
---
.../main/java/org/apache/paimon/CoreOptions.java | 7 +++
.../paimon/flink/kafka/KafkaLogStoreFactory.java | 4 +-
.../flink/kafka/KafkaLogSerializationTest.java | 66 +++++++++++++++++++++-
.../paimon/flink/kafka/KafkaLogTestUtils.java | 21 ++++++-
.../paimon/flink/log/LogStoreTableFactory.java | 21 +++++--
.../flink/sink/RowDataStoreWriteOperator.java | 9 ++-
.../paimon/flink/source/BaseDataTableSource.java | 5 ++
7 files changed, 119 insertions(+), 14 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index abc1616087..ab84aed36f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -805,6 +805,13 @@ public class CoreOptions implements Serializable {
.defaultValue("debezium-json")
.withDescription("Specify the message format of log
system.");
+ @ExcludeFromDocumentation("Confused without log system")
+ public static final ConfigOption<Boolean> LOG_IGNORE_DELETE =
+ key("log.ignore-delete")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Specify whether the log system ignores
delete records.");
+
public static final ConfigOption<Boolean> AUTO_CREATE =
key("auto-create")
.booleanType()
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
index 8a9ec42a25..f382a8cf18 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
@@ -87,7 +87,7 @@ public class KafkaLogStoreFactory implements
LogStoreTableFactory {
.createRuntimeDecoder(sourceContext, keyType);
}
DeserializationSchema<RowData> valueDeserializer =
- LogStoreTableFactory.getValueDecodingFormat(helper)
+ LogStoreTableFactory.getValueDecodingFormat(helper,
primaryKey.length != 0)
.createRuntimeDecoder(sourceContext, physicalType);
Options options = toOptions(helper.getOptions());
Long timestampMills = options.get(SCAN_TIMESTAMP_MILLIS);
@@ -127,7 +127,7 @@ public class KafkaLogStoreFactory implements
LogStoreTableFactory {
.createRuntimeEncoder(sinkContext, keyType);
}
SerializationSchema<RowData> valueSerializer =
- LogStoreTableFactory.getValueEncodingFormat(helper)
+ LogStoreTableFactory.getValueEncodingFormat(helper,
primaryKey.length != 0)
.createRuntimeEncoder(sinkContext, physicalType);
Options options = toOptions(helper.getOptions());
return new KafkaLogSinkProvider(
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
index 9d8e2295a8..28a746c307 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.types.RowKind;
import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.util.Collector;
@@ -30,8 +31,13 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.paimon.CoreOptions.LOG_FORMAT;
+import static org.apache.paimon.CoreOptions.LOG_IGNORE_DELETE;
import static org.apache.paimon.flink.FlinkRowData.toFlinkRowKind;
import static
org.apache.paimon.flink.kafka.KafkaLogTestUtils.discoverKafkaLogFactory;
import static org.apache.paimon.flink.kafka.KafkaLogTestUtils.testContext;
@@ -69,6 +75,50 @@ public class KafkaLogSerializationTest {
checkNonKeyed(LogChangelogMode.ALL, -1, 5, 3);
}
+ @Test
+ public void testNonKeyedWithInsertOnlyFormat() throws Exception {
+ check(
+ LogChangelogMode.AUTO,
+ false,
+ -1,
+ 3,
+ 5,
+ RowKind.INSERT,
+ Collections.singletonMap(LOG_FORMAT.key(), "json"));
+ check(
+ LogChangelogMode.AUTO,
+ false,
+ -1,
+ 3,
+ 5,
+ RowKind.UPDATE_AFTER,
+ Collections.singletonMap(LOG_FORMAT.key(), "json"));
+ }
+
+ @Test
+ public void testKeyedWithInsertOnlyFormat() throws Exception {
+ Map<String, String> dynamicOptions = new HashMap<>();
+ dynamicOptions.put(LOG_FORMAT.key(), "json");
+
+ assertThatThrownBy(
+ () ->
+ check(
+ LogChangelogMode.AUTO,
+ true,
+ -1,
+ 3,
+ 5,
+ RowKind.INSERT,
+ dynamicOptions))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "A value format should deal with all records. But json
has a changelog mode of [INSERT]");
+
+ dynamicOptions.put(LOG_IGNORE_DELETE.key(), "true");
+ check(LogChangelogMode.AUTO, true, -1, 3, 5, RowKind.INSERT,
dynamicOptions);
+ check(LogChangelogMode.AUTO, true, -1, 3, 5, RowKind.UPDATE_AFTER,
dynamicOptions);
+ }
+
private void checkKeyed(LogChangelogMode mode, int bucket, int key, int
value)
throws Exception {
check(mode, true, bucket, key, value, RowKind.INSERT);
@@ -88,11 +138,23 @@ public class KafkaLogSerializationTest {
private void check(
LogChangelogMode mode, boolean keyed, int bucket, int key, int
value, RowKind rowKind)
throws Exception {
+ check(mode, keyed, bucket, key, value, rowKind,
Collections.emptyMap());
+ }
+
+ private void check(
+ LogChangelogMode mode,
+ boolean keyed,
+ int bucket,
+ int key,
+ int value,
+ RowKind rowKind,
+ Map<String, String> dynamicOptions)
+ throws Exception {
KafkaLogSerializationSchema serializer =
- createTestSerializationSchema(testContext("", mode, keyed));
+ createTestSerializationSchema(testContext("", mode, keyed,
dynamicOptions));
serializer.open(null);
KafkaRecordDeserializationSchema<RowData> deserializer =
- createTestDeserializationSchema(testContext("", mode, keyed));
+ createTestDeserializationSchema(testContext("", mode, keyed,
dynamicOptions));
deserializer.open(null);
SinkRecord input = testRecord(keyed, bucket, key, value, rowKind);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
index 375cbb5c43..ef57941c52 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
@@ -175,7 +175,21 @@ public class KafkaLogTestUtils {
public static DynamicTableFactory.Context testContext(
String servers, LogChangelogMode changelogMode, boolean keyed) {
- return testContext("table", servers, changelogMode,
LogConsistency.TRANSACTIONAL, keyed);
+ return testContext(servers, changelogMode, keyed,
Collections.emptyMap());
+ }
+
+ public static DynamicTableFactory.Context testContext(
+ String servers,
+ LogChangelogMode changelogMode,
+ boolean keyed,
+ Map<String, String> dynamicOptions) {
+ return testContext(
+ "table",
+ servers,
+ changelogMode,
+ LogConsistency.TRANSACTIONAL,
+ keyed,
+ dynamicOptions);
}
static DynamicTableFactory.Context testContext(
@@ -183,7 +197,8 @@ public class KafkaLogTestUtils {
String servers,
LogChangelogMode changelogMode,
LogConsistency consistency,
- boolean keyed) {
+ boolean keyed,
+ Map<String, String> dynamicOptions) {
return testContext(
name,
servers,
@@ -191,7 +206,7 @@ public class KafkaLogTestUtils {
consistency,
RowType.of(new IntType(), new IntType()),
keyed ? new int[] {0} : new int[0],
- new HashMap<>());
+ dynamicOptions);
}
public static DynamicTableFactory.Context testContext(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
index 324b6de2ff..4b15dc8369 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
@@ -45,6 +45,7 @@ import org.apache.flink.types.RowKind;
import javax.annotation.Nullable;
import static org.apache.paimon.CoreOptions.LOG_FORMAT;
+import static org.apache.paimon.CoreOptions.LOG_IGNORE_DELETE;
import static org.apache.paimon.CoreOptions.LOG_KEY_FORMAT;
/**
@@ -100,6 +101,12 @@ public interface LogStoreTableFactory extends Factory {
.defaultValue(LOG_FORMAT.defaultValue());
}
+ static ConfigOption<Boolean> logIgnoreDelete() {
+ return ConfigOptions.key(LOG_IGNORE_DELETE.key())
+ .booleanType()
+ .defaultValue(LOG_IGNORE_DELETE.defaultValue());
+ }
+
static LogStoreTableFactory discoverLogStoreFactory(ClassLoader cl, String
identifier) {
return FactoryUtil.discoverFactory(cl, LogStoreTableFactory.class,
identifier);
}
@@ -121,18 +128,20 @@ public interface LogStoreTableFactory extends Factory {
}
static DecodingFormat<DeserializationSchema<RowData>>
getValueDecodingFormat(
- FlinkTableFactoryHelper helper) {
+ FlinkTableFactoryHelper helper, boolean hasPrimaryKey) {
DecodingFormat<DeserializationSchema<RowData>> format =
helper.discoverDecodingFormat(DeserializationFormatFactory.class, logFormat());
- validateValueFormat(format, helper.getOptions().get(logFormat()));
+ boolean insertOnly = !hasPrimaryKey ||
helper.getOptions().get(logIgnoreDelete());
+ validateValueFormat(format, helper.getOptions().get(logFormat()),
insertOnly);
return format;
}
static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
- FlinkTableFactoryHelper helper) {
+ FlinkTableFactoryHelper helper, boolean hasPrimaryKey) {
EncodingFormat<SerializationSchema<RowData>> format =
helper.discoverEncodingFormat(SerializationFormatFactory.class, logFormat());
- validateValueFormat(format, helper.getOptions().get(logFormat()));
+ boolean insertOnly = !hasPrimaryKey ||
helper.getOptions().get(logIgnoreDelete());
+ validateValueFormat(format, helper.getOptions().get(logFormat()),
insertOnly);
return format;
}
@@ -146,8 +155,8 @@ public interface LogStoreTableFactory extends Factory {
}
}
- static void validateValueFormat(Format format, String name) {
- if (!format.getChangelogMode().equals(ChangelogMode.all())) {
+ static void validateValueFormat(Format format, String name, boolean
insertOnly) {
+ if (!insertOnly &&
!format.getChangelogMode().equals(ChangelogMode.all())) {
throw new ValidationException(
String.format(
"A value format should deal with all records. "
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index 8009bec967..bd849f9d3e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.log.LogWriteCallback;
+import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;
@@ -49,6 +50,8 @@ import java.lang.reflect.Method;
import java.util.List;
import java.util.Objects;
+import static org.apache.paimon.CoreOptions.LOG_IGNORE_DELETE;
+
/** A {@link PrepareCommitOperator} to write {@link InternalRow}. Record
schema is fixed. */
public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow>
{
@@ -57,6 +60,7 @@ public class RowDataStoreWriteOperator extends
TableWriteOperator<InternalRow> {
@Nullable private final LogSinkFunction logSinkFunction;
private transient SimpleContext sinkContext;
@Nullable private transient LogWriteCallback logCallback;
+ private transient boolean logIgnoreDelete;
/** We listen to this ourselves because we don't have an {@link
InternalTimerService}. */
private long currentWatermark = Long.MIN_VALUE;
@@ -97,6 +101,7 @@ public class RowDataStoreWriteOperator extends
TableWriteOperator<InternalRow> {
openFunction(logSinkFunction);
logCallback = new LogWriteCallback();
logSinkFunction.setWriteCallback(logCallback);
+ logIgnoreDelete =
Options.fromMap(table.options()).get(LOG_IGNORE_DELETE);
}
}
@@ -139,7 +144,9 @@ public class RowDataStoreWriteOperator extends
TableWriteOperator<InternalRow> {
throw new IOException(e);
}
- if (record != null && logSinkFunction != null) {
+ if (record != null
+ && logSinkFunction != null
+ && (!logIgnoreDelete || record.row().getRowKind().isAdd())) {
// write to log store, need to preserve original pk (which
includes partition fields)
SinkRecord logRecord = write.toLogRecord(record);
logSinkFunction.invoke(logRecord, sinkContext);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 836c1372f7..aaad71977d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -65,6 +65,7 @@ import java.util.stream.IntStream;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.paimon.CoreOptions.LOG_IGNORE_DELETE;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC;
import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC_THREAD_NUMBER;
@@ -149,6 +150,10 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
return ChangelogMode.all();
}
+ if (logStoreTableFactory != null &&
options.get(LOG_IGNORE_DELETE)) {
+ return ChangelogMode.insertOnly();
+ }
+
// optimization: transaction consistency and all changelog mode
avoid the generation of
// normalized nodes. See FlinkTableSink.getChangelogMode
validation.
return options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL