This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ecce2cc87 [FLINK-31434] Introduce CDC sink for Table Store (#674)
ecce2cc87 is described below
commit ecce2cc876c1bd9095abef4fdf24400bf179fefa
Author: tsreaper <[email protected]>
AuthorDate: Wed Mar 22 14:52:51 2023 +0800
[FLINK-31434] Introduce CDC sink for Table Store (#674)
---
.../main/java/org/apache/paimon/cdc/CdcRecord.java | 5 +
.../java/org/apache/paimon/cdc/EventParser.java | 46 +++++
.../apache/paimon/flink/sink/CompactorSink.java | 2 +-
.../apache/paimon/flink/sink/FileStoreSink.java | 2 +-
.../org/apache/paimon/flink/sink/FlinkSink.java | 9 +-
.../flink/sink/cdc/CdcBucketStreamPartitioner.java | 141 ++++++++++++++
.../flink/sink/cdc/CdcParsingProcessFunction.java | 69 +++++++
.../{FileStoreSink.java => cdc/FlinkCdcSink.java} | 32 ++--
.../paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java | 116 ++++++++++++
.../sink/cdc/SchemaChangeProcessFunction.java | 64 +++++++
.../paimon/flink/sink/cdc/FlinkCdcSinkITCase.java | 204 +++++++++++++++++++++
.../apache/paimon/flink/sink/cdc/TestCdcEvent.java | 45 +++--
.../paimon/flink/sink/cdc/TestCdcEventParser.java | 52 ++++++
.../flink/sink/cdc/TestCdcSourceFunction.java | 110 +++++++++++
14 files changed, 853 insertions(+), 44 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
b/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
index 3c65dec4e..4e80002af 100644
--- a/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
+++ b/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
@@ -55,4 +55,9 @@ public class CdcRecord implements Serializable {
CdcRecord that = (CdcRecord) o;
return Objects.equals(kind, that.kind) && Objects.equals(fields,
that.fields);
}
+
+ @Override
+ public String toString() {
+ return kind.shortString() + " " + fields;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java
b/paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java
new file mode 100644
index 000000000..46c744699
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.cdc;
+
+import org.apache.paimon.schema.SchemaChange;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Parse a CDC change event to a list of {@link SchemaChange} or {@link
CdcRecord}.
+ *
+ * @param <T> CDC change event type
+ */
+public interface EventParser<T> {
+
+ void setRawEvent(T rawEvent);
+
+ boolean isSchemaChange();
+
+ List<SchemaChange> getSchemaChanges();
+
+ List<CdcRecord> getRecords();
+
+ /** Factory to create an {@link EventParser}. */
+ interface Factory<T> extends Serializable {
+
+ EventParser<T> create();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
index 166de1eae..ee26918a9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.util.function.SerializableFunction;
/** {@link FlinkSink} for dedicated compact jobs. */
-public class CompactorSink extends FlinkSink {
+public class CompactorSink extends FlinkSink<RowData> {
private static final long serialVersionUID = 1L;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
index 5e6cf4181..b8d5bf2af 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
@@ -32,7 +32,7 @@ import javax.annotation.Nullable;
import java.util.Map;
/** {@link FlinkSink} for writing records into paimon. */
-public class FileStoreSink extends FlinkSink {
+public class FileStoreSink extends FlinkSink<RowData> {
private static final long serialVersionUID = 1L;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 5db1ad11a..0e12cbef5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -35,7 +35,6 @@ import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.table.data.RowData;
import org.apache.flink.util.function.SerializableFunction;
import java.io.Serializable;
@@ -45,7 +44,7 @@ import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_F
import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
/** Abstract sink of paimon. */
-public abstract class FlinkSink implements Serializable {
+public abstract class FlinkSink<T> implements Serializable {
private static final long serialVersionUID = 1L;
@@ -91,7 +90,7 @@ public abstract class FlinkSink implements Serializable {
new StoreSinkWriteImpl(table, context, initialCommitUser,
ioManager, isOverwrite);
}
- public DataStreamSink<?> sinkFrom(DataStream<RowData> input) {
+ public DataStreamSink<?> sinkFrom(DataStream<T> input) {
// This commitUser is valid only for new jobs.
// After the job starts, this commitUser will be recorded into the
states of write and
// commit operators.
@@ -102,7 +101,7 @@ public abstract class FlinkSink implements Serializable {
}
public DataStreamSink<?> sinkFrom(
- DataStream<RowData> input, String commitUser,
StoreSinkWrite.Provider sinkProvider) {
+ DataStream<T> input, String commitUser, StoreSinkWrite.Provider
sinkProvider) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
@@ -150,7 +149,7 @@ public abstract class FlinkSink implements Serializable {
+ " to exactly-once");
}
- protected abstract OneInputStreamOperator<RowData, Committable>
createWriteOperator(
+ protected abstract OneInputStreamOperator<T, Committable>
createWriteOperator(
StoreSinkWrite.Provider writeProvider, boolean isStreaming);
protected abstract SerializableFunction<String, Committer>
createCommitterFactory(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
new file mode 100644
index 000000000..ef852be7f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.BucketComputer;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.TypeUtils;
+
+import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * A {@link StreamPartitioner} which partitions {@link CdcRecord}s according
to the hash value of
+ * bucket keys (or primary keys if bucket keys are not specified).
+ */
+public class CdcBucketStreamPartitioner extends StreamPartitioner<CdcRecord> {
+
+ private final List<String> bucketKeys;
+ private final DataType[] bucketTypes;
+ private final List<String> partitionKeys;
+ private final DataType[] partitionTypes;
+ private final boolean shuffleByPartitionEnable;
+
+ private transient int numberOfChannels;
+ private transient Projection bucketProjection;
+ private transient Projection partitionProjection;
+
+ public CdcBucketStreamPartitioner(TableSchema schema, boolean
shuffleByPartitionEnable) {
+ List<String> bucketKeys = schema.originalBucketKeys();
+ if (bucketKeys.isEmpty()) {
+ bucketKeys = schema.primaryKeys();
+ }
+ Preconditions.checkArgument(
+ bucketKeys.size() > 0, "Either bucket keys or primary keys
must be defined");
+
+ this.bucketKeys = bucketKeys;
+ this.bucketTypes = getTypes(this.bucketKeys, schema);
+ this.partitionKeys = schema.partitionKeys();
+ this.partitionTypes = getTypes(this.partitionKeys, schema);
+ this.shuffleByPartitionEnable = shuffleByPartitionEnable;
+ }
+
+ private DataType[] getTypes(List<String> keys, TableSchema schema) {
+ List<DataType> types = new ArrayList<>();
+ for (String key : keys) {
+ int idx = schema.fieldNames().indexOf(key);
+ types.add(schema.fields().get(idx).type());
+ }
+ return types.toArray(new DataType[0]);
+ }
+
+ @Override
+ public void setup(int numberOfChannels) {
+ super.setup(numberOfChannels);
+ this.numberOfChannels = numberOfChannels;
+ this.bucketProjection =
+ CodeGenUtils.newProjection(
+ RowType.of(bucketTypes), IntStream.range(0,
bucketTypes.length).toArray());
+ this.partitionProjection =
+ CodeGenUtils.newProjection(
+ RowType.of(partitionTypes),
+ IntStream.range(0, partitionTypes.length).toArray());
+ }
+
+ @Override
+ public int selectChannel(
+ SerializationDelegate<StreamRecord<CdcRecord>>
streamRecordSerializationDelegate) {
+ CdcRecord record =
streamRecordSerializationDelegate.getInstance().getValue();
+ BinaryRow bucketKeyRow =
+ toBinaryRow(record.fields(), bucketKeys, bucketTypes,
bucketProjection);
+ if (shuffleByPartitionEnable) {
+ BinaryRow partitionKeyRow =
+ toBinaryRow(
+ record.fields(), partitionKeys, partitionTypes,
partitionProjection);
+ return BucketComputer.bucket(
+ Objects.hash(bucketKeyRow.hashCode(),
partitionKeyRow.hashCode()),
+ numberOfChannels);
+ } else {
+ return BucketComputer.bucket(bucketKeyRow.hashCode(),
numberOfChannels);
+ }
+ }
+
+ private BinaryRow toBinaryRow(
+ Map<String, String> fields,
+ List<String> keys,
+ DataType[] types,
+ Projection projection) {
+ GenericRow genericRow = new GenericRow(keys.size());
+ for (int i = 0; i < keys.size(); i++) {
+ genericRow.setField(i,
TypeUtils.castFromString(fields.get(keys.get(i)), types[i]));
+ }
+ return projection.apply(genericRow);
+ }
+
+ @Override
+ public StreamPartitioner<CdcRecord> copy() {
+ return null;
+ }
+
+ @Override
+ public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
+ return SubtaskStateMapper.FULL;
+ }
+
+ @Override
+ public boolean isPointwise() {
+ return false;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
new file mode 100644
index 000000000..223b709ec
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.cdc.EventParser;
+import org.apache.paimon.schema.SchemaChange;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * A {@link ProcessFunction} to parse CDC change event to either {@link
SchemaChange} or {@link
+ * CdcRecord} and send them to different downstreams.
+ *
+ * @param <T> CDC change event type
+ */
+public class CdcParsingProcessFunction<T> extends ProcessFunction<T,
CdcRecord> {
+
+ public static final OutputTag<SchemaChange> SCHEMA_CHANGE_OUTPUT_TAG =
+ new OutputTag<>("schema-change",
TypeInformation.of(SchemaChange.class));
+
+ private final EventParser.Factory<T> parserFactory;
+
+ private transient EventParser<T> parser;
+
+ public CdcParsingProcessFunction(EventParser.Factory<T> parserFactory) {
+ this.parserFactory = parserFactory;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ parser = parserFactory.create();
+ }
+
+ @Override
+ public void processElement(T raw, Context context, Collector<CdcRecord>
collector)
+ throws Exception {
+ parser.setRawEvent(raw);
+ if (parser.isSchemaChange()) {
+ for (SchemaChange schemaChange : parser.getSchemaChanges()) {
+ context.output(SCHEMA_CHANGE_OUTPUT_TAG, schemaChange);
+ }
+ } else {
+ for (CdcRecord record : parser.getRecords()) {
+ collector.collect(record);
+ }
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
similarity index 73%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
index 5e6cf4181..7981fec6e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
@@ -16,45 +16,50 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink;
+package org.apache.paimon.flink.sink.cdc;
+import org.apache.paimon.cdc.CdcRecord;
import org.apache.paimon.flink.VersionedSerializerWrapper;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableStateManager;
+import org.apache.paimon.flink.sink.Committer;
+import org.apache.paimon.flink.sink.FlinkSink;
+import org.apache.paimon.flink.sink.LogSinkFunction;
+import org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager;
+import org.apache.paimon.flink.sink.StoreCommitter;
+import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.manifest.ManifestCommittableSerializer;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.table.data.RowData;
import org.apache.flink.util.function.SerializableFunction;
import javax.annotation.Nullable;
-import java.util.Map;
-
-/** {@link FlinkSink} for writing records into paimon. */
-public class FileStoreSink extends FlinkSink {
+/**
+ * A {@link FlinkSink} which accepts {@link CdcRecord} and waits for a schema
change if necessary.
+ */
+public class FlinkCdcSink extends FlinkSink<CdcRecord> {
private static final long serialVersionUID = 1L;
private final Lock.Factory lockFactory;
- @Nullable private final Map<String, String> overwritePartition;
@Nullable private final LogSinkFunction logSinkFunction;
- public FileStoreSink(
+ public FlinkCdcSink(
FileStoreTable table,
Lock.Factory lockFactory,
- @Nullable Map<String, String> overwritePartition,
@Nullable LogSinkFunction logSinkFunction) {
- super(table, overwritePartition != null);
+ super(table, false);
this.lockFactory = lockFactory;
- this.overwritePartition = overwritePartition;
this.logSinkFunction = logSinkFunction;
}
@Override
- protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
+ protected OneInputStreamOperator<CdcRecord, Committable>
createWriteOperator(
StoreSinkWrite.Provider writeProvider, boolean isStreaming) {
- return new RowDataStoreWriteOperator(table, logSinkFunction,
writeProvider);
+ return new SchemaAwareStoreWriteOperator(table, logSinkFunction,
writeProvider);
}
@Override
@@ -67,7 +72,6 @@ public class FileStoreSink extends FlinkSink {
return user ->
new StoreCommitter(
table.newCommit(user)
- .withOverwrite(overwritePartition)
.withLock(lockFactory.create())
.ignoreEmptyCommit(!streamingCheckpointEnabled));
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
new file mode 100644
index 000000000..190deb464
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.cdc.EventParser;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sink.LogSinkFunction;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+
+import javax.annotation.Nullable;
+
+/**
+ * Builder for {@link FlinkCdcSink}.
+ *
+ * @param <T> CDC change event type
+ */
+public class FlinkCdcSinkBuilder<T> {
+
+ private DataStream<T> input = null;
+ private EventParser.Factory<T> parserFactory = null;
+ private FileStoreTable table = null;
+ private Lock.Factory lockFactory = Lock.emptyFactory();
+ @Nullable private LogSinkFunction logSinkFunction;
+
+ @Nullable private Integer parallelism;
+
+ public FlinkCdcSinkBuilder<T> withInput(DataStream<T> input) {
+ this.input = input;
+ return this;
+ }
+
+ public FlinkCdcSinkBuilder<T> withParserFactory(EventParser.Factory<T>
parserFactory) {
+ this.parserFactory = parserFactory;
+ return this;
+ }
+
+ public FlinkCdcSinkBuilder<T> withTable(FileStoreTable table) {
+ this.table = table;
+ return this;
+ }
+
+ public FlinkCdcSinkBuilder<T> withLockFactory(Lock.Factory lockFactory) {
+ this.lockFactory = lockFactory;
+ return this;
+ }
+
+ public FlinkCdcSinkBuilder<T> withLogSinkFunction(@Nullable
LogSinkFunction logSinkFunction) {
+ this.logSinkFunction = logSinkFunction;
+ return this;
+ }
+
+ public FlinkCdcSinkBuilder<T> withParallelism(@Nullable Integer
parallelism) {
+ this.parallelism = parallelism;
+ return this;
+ }
+
+ public DataStreamSink<?> build() {
+ Preconditions.checkNotNull(input);
+ Preconditions.checkNotNull(parserFactory);
+ Preconditions.checkNotNull(table);
+
+ SingleOutputStreamOperator<CdcRecord> parsed =
+ input.forward()
+ .process(new
CdcParsingProcessFunction<>(parserFactory))
+ .setParallelism(input.getParallelism());
+
+ DataStream<Void> schemaChangeProcessFunction =
+
parsed.getSideOutput(CdcParsingProcessFunction.SCHEMA_CHANGE_OUTPUT_TAG)
+ .process(
+ new SchemaChangeProcessFunction(
+ new SchemaManager(table.fileIO(),
table.location())));
+ schemaChangeProcessFunction.getTransformation().setParallelism(1);
+
+ CdcBucketStreamPartitioner partitioner =
+ new CdcBucketStreamPartitioner(
+ table.schema(),
+ table.options()
+ .toConfiguration()
+
.get(FlinkConnectorOptions.SINK_SHUFFLE_BY_PARTITION));
+ PartitionTransformation<CdcRecord> partitioned =
+ new PartitionTransformation<>(parsed.getTransformation(),
partitioner);
+ if (parallelism != null) {
+ partitioned.setParallelism(parallelism);
+ }
+
+ StreamExecutionEnvironment env = input.getExecutionEnvironment();
+ FlinkCdcSink sink = new FlinkCdcSink(table, lockFactory,
logSinkFunction);
+ return sink.sinkFrom(new DataStream<>(env, partitioned));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
new file mode 100644
index 000000000..d3ff71e09
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ProcessFunction} to handle {@link SchemaChange}.
+ *
+ * <p>NOTE: To avoid concurrent schema changes, the parallelism of this {@link
ProcessFunction} must
+ * be 1.
+ */
+public class SchemaChangeProcessFunction extends ProcessFunction<SchemaChange,
Void> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SchemaChangeProcessFunction.class);
+
+ private final SchemaManager schemaManager;
+
+ public SchemaChangeProcessFunction(SchemaManager schemaManager) {
+ this.schemaManager = schemaManager;
+ }
+
+ @Override
+ public void processElement(
+ SchemaChange schemaChange, Context context, Collector<Void>
collector)
+ throws Exception {
+ Preconditions.checkArgument(
+ schemaChange instanceof SchemaChange.AddColumn,
+ "Currently, only SchemaChange.AddColumn is supported.");
+ try {
+ schemaManager.commitChanges(schemaChange);
+ } catch (Exception e) {
+ // This is normal. For example when a table is split into multiple
database tables, all
+ // these tables will be added the same column. However
schemaManager can't handle
+ // duplicated column adds, so we just catch the exception and log
it.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to perform schema change {}", schemaChange,
e);
+ }
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
new file mode 100644
index 000000000..eb78c1d07
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FailingFileIO;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link FlinkCdcSink}. */
+public class FlinkCdcSinkITCase extends AbstractTestBase {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ @Timeout(60)
+ public void testRandomCdcEvents() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ int numEvents = random.nextInt(20000) + 1;
+ int numSchemaChanges = random.nextInt(20) + 1;
+ int numKeys = random.nextInt(2000) + 1;
+ boolean enableFailure = random.nextBoolean();
+
+ TestCdcEvent[] events = new TestCdcEvent[numEvents];
+
+ Set<Integer> schemaChangePositions = new HashSet<>();
+ for (int i = 0; i < numSchemaChanges; i++) {
+ int pos;
+ do {
+ pos = random.nextInt(numEvents);
+ } while (schemaChangePositions.contains(pos));
+ schemaChangePositions.add(pos);
+ }
+
+ Map<Integer, Map<String, String>> expected = new HashMap<>();
+ List<String> fieldNames = new ArrayList<>();
+ fieldNames.add("v0");
+ int suffixId = 0;
+ for (int i = 0; i < numEvents; i++) {
+ if (schemaChangePositions.contains(i)) {
+ suffixId++;
+ String newName = "v" + suffixId;
+ fieldNames.add(newName);
+ events[i] = new TestCdcEvent(SchemaChange.addColumn(newName,
DataTypes.INT()));
+ } else {
+ Map<String, String> fields = new HashMap<>();
+ int key = random.nextInt(numKeys);
+ fields.put("k", String.valueOf(key));
+ for (String fieldName : fieldNames) {
+ fields.put(fieldName, String.valueOf(random.nextInt()));
+ }
+
+ List<CdcRecord> records = new ArrayList<>();
+ if (expected.containsKey(key)) {
+ records.add(new CdcRecord(RowKind.DELETE,
expected.get(key)));
+ }
+ records.add(new CdcRecord(RowKind.INSERT, fields));
+ events[i] = new TestCdcEvent(records);
+ expected.put(key, fields);
+ }
+ }
+
+ Path tablePath;
+ FileIO fileIO;
+ String failingName = UUID.randomUUID().toString();
+ if (enableFailure) {
+ tablePath = new Path(FailingFileIO.getFailingPath(failingName,
tempDir.toString()));
+ fileIO = new FailingFileIO();
+ } else {
+ tablePath = new Path(TraceableFileIO.SCHEME + "://" +
tempDir.toString());
+ fileIO = LocalFileIO.create();
+ }
+
+ // no failure when creating table
+ FailingFileIO.reset(failingName, 0, 1);
+
+ FileStoreTable table =
+ createFileStoreTable(
+ tablePath,
+ fileIO,
+ RowType.of(
+ new DataType[] {DataTypes.INT(),
DataTypes.INT()},
+ new String[] {"k", "v0"}),
+ Collections.emptyList(),
+ Collections.singletonList("k"));
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getCheckpointConfig().setCheckpointInterval(1000);
+ TestCdcSourceFunction sourceFunction =
+ new TestCdcSourceFunction(
+ events, record ->
Integer.valueOf(record.fields().get("k")));
+ DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
+ source.setParallelism(2);
+ new FlinkCdcSinkBuilder<TestCdcEvent>()
+ .withInput(source)
+ .withParserFactory(TestCdcEventParser::new)
+ .withTable(table)
+ .withParallelism(3)
+ .build();
+
+ // enable failure when running jobs if needed
+ FailingFileIO.reset(failingName, 100, 10000);
+
+ env.execute();
+
+ // no failure when checking results
+ FailingFileIO.reset(failingName, 0, 1);
+
+ table = table.copyWithLatestSchema();
+ SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
+ TableSchema schema = schemaManager.latest().get();
+
+ Map<Integer, Map<String, String>> actual = new HashMap<>();
+ DataTableScan.DataFilePlan plan = table.newScan().plan();
+ try (RecordReaderIterator<InternalRow> it =
+ new
RecordReaderIterator<>(table.newRead().createReader(plan))) {
+ while (it.hasNext()) {
+ InternalRow row = it.next();
+ Map<String, String> fields = new HashMap<>();
+ for (int i = 0; i < schema.fieldNames().size(); i++) {
+ if (!row.isNullAt(i)) {
+ fields.put(schema.fieldNames().get(i),
String.valueOf(row.getInt(i)));
+ }
+ }
+ actual.put(Integer.valueOf(fields.get("k")), fields);
+ }
+ }
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private FileStoreTable createFileStoreTable(
+ Path tablePath,
+ FileIO fileIO,
+ RowType rowType,
+ List<String> partitions,
+ List<String> primaryKeys)
+ throws Exception {
+ Options conf = new Options();
+ conf.set(CoreOptions.BUCKET, 3);
+ conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+ conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(fileIO, tablePath),
+ new Schema(rowType.getFields(), partitions,
primaryKeys, conf.toMap(), ""));
+ return FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
similarity index 52%
copy from paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
copy to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
index 3c65dec4e..f65e3002a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
@@ -16,43 +16,42 @@
* limitations under the License.
*/
-package org.apache.paimon.cdc;
+package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.types.RowKind;
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.schema.SchemaChange;
import java.io.Serializable;
-import java.util.Map;
-import java.util.Objects;
+import java.util.List;
-/** A data change message from the CDC source. */
-public class CdcRecord implements Serializable {
+/** Testing CDC change event. */
+public class TestCdcEvent implements Serializable {
private static final long serialVersionUID = 1L;
- private final RowKind kind;
- private final Map<String, String> fields;
+ private final SchemaChange schemaChange;
+ private final List<CdcRecord> records;
- public CdcRecord(RowKind kind, Map<String, String> fields) {
- this.kind = kind;
- this.fields = fields;
+ public TestCdcEvent(SchemaChange schemaChange) {
+ this.schemaChange = schemaChange;
+ this.records = null;
}
- public RowKind kind() {
- return kind;
+ public TestCdcEvent(List<CdcRecord> records) {
+ this.schemaChange = null;
+ this.records = records;
}
- /** Map key is the field's name, and map value is the field's value. */
- public Map<String, String> fields() {
- return fields;
+ public SchemaChange schemaChange() {
+ return schemaChange;
}
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof CdcRecord)) {
- return false;
- }
+ public List<CdcRecord> records() {
+ return records;
+ }
- CdcRecord that = (CdcRecord) o;
- return Objects.equals(kind, that.kind) && Objects.equals(fields,
that.fields);
+ @Override
+ public String toString() {
+ return String.format("{schemChange = %s, records = %s}", schemaChange,
records);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
new file mode 100644
index 000000000..75d11afa1
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.cdc.EventParser;
+import org.apache.paimon.schema.SchemaChange;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Testing {@link EventParser} for {@link TestCdcEvent}. */
+public class TestCdcEventParser implements EventParser<TestCdcEvent> {
+
+ private TestCdcEvent raw;
+
+ @Override
+ public void setRawEvent(TestCdcEvent raw) {
+ this.raw = raw;
+ }
+
+ @Override
+ public boolean isSchemaChange() {
+ return raw.schemaChange() != null;
+ }
+
+ @Override
+ public List<SchemaChange> getSchemaChanges() {
+ return Collections.singletonList(raw.schemaChange());
+ }
+
+ @Override
+ public List<CdcRecord> getRecords() {
+ return raw.records();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
new file mode 100644
index 000000000..caa7e46f8
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.cdc.CdcRecord;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SerializableFunction;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.stream.Collectors;
+
+/**
+ * Testing {@link RichParallelSourceFunction} to produce {@link TestCdcEvent}.
{@link TestCdcEvent}s
+ * with the same key will be produced by the same parallelism.
+ */
+public class TestCdcSourceFunction extends
RichParallelSourceFunction<TestCdcEvent>
+ implements CheckpointedFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private final LinkedList<TestCdcEvent> events;
+ private final SerializableFunction<CdcRecord, Integer> getKeyHash;
+
+ private volatile boolean isRunning = true;
+ private transient ListState<Integer> remainingEventsCount;
+
+ public TestCdcSourceFunction(
+ TestCdcEvent[] events, SerializableFunction<CdcRecord, Integer>
getKeyHash) {
+ this.events =
Arrays.stream(events).collect(Collectors.toCollection(LinkedList::new));
+ this.getKeyHash = getKeyHash;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws
Exception {
+ remainingEventsCount =
+ context.getOperatorStateStore()
+ .getListState(new ListStateDescriptor<>("count",
Integer.class));
+
+ if (context.isRestored()) {
+ int count = 0;
+ for (int c : remainingEventsCount.get()) {
+ count += c;
+ }
+ while (events.size() > count) {
+ events.poll();
+ }
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+ remainingEventsCount.clear();
+ remainingEventsCount.add(events.size());
+ }
+
+ @Override
+ public void run(SourceContext<TestCdcEvent> ctx) throws Exception {
+ while (isRunning && !events.isEmpty()) {
+ synchronized (ctx.getCheckpointLock()) {
+ TestCdcEvent event = events.poll();
+ if (event.records() != null) {
+ for (int i = 0; i + 1 < event.records().size(); i++) {
+ Preconditions.checkArgument(
+ getKeyHash
+ .apply(event.records().get(i))
+
.equals(getKeyHash.apply(event.records().get(i + 1))),
+ "Key hashes in the same List<Record> are not
equal."
+ + "This is an invalid test data.");
+ }
+ int hash = getKeyHash.apply(event.records().get(0));
+ int subtaskId =
getRuntimeContext().getIndexOfThisSubtask();
+ int totalSubtasks =
getRuntimeContext().getNumberOfParallelSubtasks();
+ if (Math.abs(hash) % totalSubtasks != subtaskId) {
+ continue;
+ }
+ }
+ ctx.collect(event);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+}