This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fcefe0475 [flink] Introduce process functions and sinks for CDC
database syncing (#896)
fcefe0475 is described below
commit fcefe04752da48c612f6288bda3ae006fb4a25e1
Author: tsreaper <[email protected]>
AuthorDate: Thu Apr 13 18:35:01 2023 +0800
[flink] Introduce process functions and sinks for CDC database syncing
(#896)
---
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 5 +
.../action/cdc/mysql/MySqlSyncTableAction.java | 6 +-
...va => CdcMultiTableParsingProcessFunction.java} | 46 ++++-
.../flink/sink/cdc/CdcParsingProcessFunction.java | 3 +
.../apache/paimon/flink/sink/cdc/EventParser.java | 2 +
.../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 123 +++++++++++++
...lder.java => FlinkCdcSyncTableSinkBuilder.java} | 15 +-
.../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java | 203 +++++++++++++++++++++
...TCase.java => FlinkCdcSyncTableSinkITCase.java} | 102 +----------
.../apache/paimon/flink/sink/cdc/TestCdcEvent.java | 23 ++-
.../paimon/flink/sink/cdc/TestCdcEventParser.java | 5 +
.../flink/sink/cdc/TestCdcSourceFunction.java | 24 +--
.../apache/paimon/flink/sink/cdc/TestTable.java | 172 +++++++++++++++++
13 files changed, 592 insertions(+), 137 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 0704e5f87..ca839f142 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -95,6 +95,11 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
}
}
+ @Override
+ public String tableName() {
+ return payload.get("source").get("table").asText();
+ }
+
private void updateFieldTypes(JsonNode schema) {
mySqlFieldTypes = new HashMap<>();
fieldClassNames = new HashMap<>();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 885b9dd1b..79e10fda4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -25,7 +25,7 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.sink.cdc.EventParser;
-import org.apache.paimon.flink.sink.cdc.FlinkCdcSinkBuilder;
+import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncTableSinkBuilder;
import org.apache.paimon.flink.sink.cdc.SchemaChangeProcessFunction;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
@@ -167,8 +167,8 @@ public class MySqlSyncTableAction implements Action {
parserFactory = MySqlDebeziumJsonEventParser::new;
}
- FlinkCdcSinkBuilder<String> sinkBuilder =
- new FlinkCdcSinkBuilder<String>()
+ FlinkCdcSyncTableSinkBuilder<String> sinkBuilder =
+ new FlinkCdcSyncTableSinkBuilder<String>()
.withInput(
env.fromSource(
source,
WatermarkStrategy.noWatermarks(), "MySQL Source"))
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
similarity index 52%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
index 357c9996d..36e61b443 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
@@ -26,42 +26,68 @@ import
org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* A {@link ProcessFunction} to parse CDC change event to either {@link
SchemaChange} or {@link
- * CdcRecord} and send them to different downstreams.
+ * CdcRecord} and send them to different side outputs according to table name.
+ *
+ * <p>This {@link ProcessFunction} can handle records for different tables at
the same time.
*
* @param <T> CDC change event type
*/
-public class CdcParsingProcessFunction<T> extends ProcessFunction<T,
CdcRecord> {
-
- public static final OutputTag<SchemaChange> SCHEMA_CHANGE_OUTPUT_TAG =
- new OutputTag<>("schema-change",
TypeInformation.of(SchemaChange.class));
+public class CdcMultiTableParsingProcessFunction<T> extends ProcessFunction<T,
Void> {
private final EventParser.Factory<T> parserFactory;
private transient EventParser<T> parser;
+ private transient Map<String, OutputTag<SchemaChange>>
schemaChangeOutputTags;
+ private transient Map<String, OutputTag<CdcRecord>> recordOutputTags;
- public CdcParsingProcessFunction(EventParser.Factory<T> parserFactory) {
+ public CdcMultiTableParsingProcessFunction(EventParser.Factory<T>
parserFactory) {
this.parserFactory = parserFactory;
}
@Override
public void open(Configuration parameters) throws Exception {
parser = parserFactory.create();
+ schemaChangeOutputTags = new HashMap<>();
+ recordOutputTags = new HashMap<>();
}
@Override
- public void processElement(T raw, Context context, Collector<CdcRecord>
collector)
- throws Exception {
+ public void processElement(T raw, Context context, Collector<Void>
collector) throws Exception {
parser.setRawEvent(raw);
+ String tableName = parser.tableName();
+
if (parser.isSchemaChange()) {
for (SchemaChange schemaChange : parser.getSchemaChanges()) {
- context.output(SCHEMA_CHANGE_OUTPUT_TAG, schemaChange);
+ context.output(getSchemaChangeOutputTag(tableName),
schemaChange);
}
} else {
for (CdcRecord record : parser.getRecords()) {
- collector.collect(record);
+ context.output(getRecordOutputTag(tableName), record);
}
}
}
+
+ private OutputTag<SchemaChange> getSchemaChangeOutputTag(String tableName)
{
+ return schemaChangeOutputTags.computeIfAbsent(
+ tableName,
CdcMultiTableParsingProcessFunction::createSchemaChangeOutputTag);
+ }
+
+ public static OutputTag<SchemaChange> createSchemaChangeOutputTag(String
tableName) {
+ return new OutputTag<>(
+ "schema-change-" + tableName,
TypeInformation.of(SchemaChange.class));
+ }
+
+ private OutputTag<CdcRecord> getRecordOutputTag(String tableName) {
+ return recordOutputTags.computeIfAbsent(
+ tableName,
CdcMultiTableParsingProcessFunction::createRecordOutputTag);
+ }
+
+ public static OutputTag<CdcRecord> createRecordOutputTag(String tableName)
{
+ return new OutputTag<>("record-" + tableName,
TypeInformation.of(CdcRecord.class));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
index 357c9996d..ba3f97cad 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
@@ -30,6 +30,9 @@ import org.apache.flink.util.OutputTag;
* A {@link ProcessFunction} to parse CDC change event to either {@link
SchemaChange} or {@link
* CdcRecord} and send them to different downstreams.
*
+ * <p>This {@link ProcessFunction} can only handle records for a single
constant table. To handle
+ * records for different tables, see {@link
CdcMultiTableParsingProcessFunction}.
+ *
* @param <T> CDC change event type
*/
public class CdcParsingProcessFunction<T> extends ProcessFunction<T,
CdcRecord> {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
index 9e575a338..e4f192d1b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
@@ -32,6 +32,8 @@ public interface EventParser<T> {
void setRawEvent(T rawEvent);
+ String tableName();
+
boolean isSchemaChange();
List<SchemaChange> getSchemaChanges();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
new file mode 100644
index 000000000..e48f929bc
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.flink.sink.BucketingStreamPartitioner;
+import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Builder for {@link FlinkCdcSink} when syncing the whole database into one
Paimon database. Each
+ * database table will be written into a separate Paimon table.
+ *
+ * <p>This builder will create a separate sink for each Paimon sink table.
Thus this implementation
+ * is not very efficient in resource saving.
+ *
+ * @param <T> CDC change event type
+ */
+public class FlinkCdcSyncDatabaseSinkBuilder<T> {
+
+ private DataStream<T> input = null;
+ private EventParser.Factory<T> parserFactory = null;
+ private List<FileStoreTable> tables = new ArrayList<>();
+ private Lock.Factory lockFactory = Lock.emptyFactory();
+
+ @Nullable private Integer parallelism;
+
+ public FlinkCdcSyncDatabaseSinkBuilder<T> withInput(DataStream<T> input) {
+ this.input = input;
+ return this;
+ }
+
+ public FlinkCdcSyncDatabaseSinkBuilder<T> withParserFactory(
+ EventParser.Factory<T> parserFactory) {
+ this.parserFactory = parserFactory;
+ return this;
+ }
+
+ public FlinkCdcSyncDatabaseSinkBuilder<T> withTables(List<FileStoreTable>
tables) {
+ this.tables = tables;
+ return this;
+ }
+
+ public FlinkCdcSyncDatabaseSinkBuilder<T> withLockFactory(Lock.Factory
lockFactory) {
+ this.lockFactory = lockFactory;
+ return this;
+ }
+
+ public FlinkCdcSyncDatabaseSinkBuilder<T> withParallelism(@Nullable
Integer parallelism) {
+ this.parallelism = parallelism;
+ return this;
+ }
+
+ public void build() {
+ Preconditions.checkNotNull(input);
+ Preconditions.checkNotNull(parserFactory);
+
+ StreamExecutionEnvironment env = input.getExecutionEnvironment();
+
+ SingleOutputStreamOperator<Void> parsed =
+ input.forward()
+ .process(new
CdcMultiTableParsingProcessFunction<>(parserFactory))
+ .setParallelism(input.getParallelism());
+
+ for (FileStoreTable table : tables) {
+ DataStream<Void> schemaChangeProcessFunction =
+ SingleOutputStreamOperatorUtils.getSideOutput(
+ parsed,
+
CdcMultiTableParsingProcessFunction.createSchemaChangeOutputTag(
+ table.name()))
+ .process(
+ new SchemaChangeProcessFunction(
+ new SchemaManager(table.fileIO(),
table.location())));
+ schemaChangeProcessFunction.getTransformation().setParallelism(1);
+
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
+
+ BucketingStreamPartitioner<CdcRecord> partitioner =
+ new BucketingStreamPartitioner<>(new
CdcRecordChannelComputer(table.schema()));
+ PartitionTransformation<CdcRecord> partitioned =
+ new PartitionTransformation<>(
+ SingleOutputStreamOperatorUtils.getSideOutput(
+ parsed,
+ CdcMultiTableParsingProcessFunction
+
.createRecordOutputTag(table.name()))
+ .getTransformation(),
+ partitioner);
+ if (parallelism != null) {
+ partitioned.setParallelism(parallelism);
+ }
+
+ FlinkCdcSink sink = new FlinkCdcSink(table, lockFactory);
+ sink.sinkFrom(new DataStream<>(env, partitioned));
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkBuilder.java
similarity index 84%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkBuilder.java
index 6663acc54..7c770b045 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkBuilder.java
@@ -34,11 +34,11 @@ import
org.apache.flink.streaming.api.transformations.PartitionTransformation;
import javax.annotation.Nullable;
/**
- * Builder for {@link FlinkCdcSink}.
+ * Builder for {@link FlinkCdcSink} when syncing multiple database tables into
one Paimon table.
*
* @param <T> CDC change event type
*/
-public class FlinkCdcSinkBuilder<T> {
+public class FlinkCdcSyncTableSinkBuilder<T> {
private DataStream<T> input = null;
private EventParser.Factory<T> parserFactory = null;
@@ -47,27 +47,27 @@ public class FlinkCdcSinkBuilder<T> {
@Nullable private Integer parallelism;
- public FlinkCdcSinkBuilder<T> withInput(DataStream<T> input) {
+ public FlinkCdcSyncTableSinkBuilder<T> withInput(DataStream<T> input) {
this.input = input;
return this;
}
- public FlinkCdcSinkBuilder<T> withParserFactory(EventParser.Factory<T>
parserFactory) {
+ public FlinkCdcSyncTableSinkBuilder<T>
withParserFactory(EventParser.Factory<T> parserFactory) {
this.parserFactory = parserFactory;
return this;
}
- public FlinkCdcSinkBuilder<T> withTable(FileStoreTable table) {
+ public FlinkCdcSyncTableSinkBuilder<T> withTable(FileStoreTable table) {
this.table = table;
return this;
}
- public FlinkCdcSinkBuilder<T> withLockFactory(Lock.Factory lockFactory) {
+ public FlinkCdcSyncTableSinkBuilder<T> withLockFactory(Lock.Factory
lockFactory) {
this.lockFactory = lockFactory;
return this;
}
- public FlinkCdcSinkBuilder<T> withParallelism(@Nullable Integer
parallelism) {
+ public FlinkCdcSyncTableSinkBuilder<T> withParallelism(@Nullable Integer
parallelism) {
this.parallelism = parallelism;
return this;
}
@@ -89,6 +89,7 @@ public class FlinkCdcSinkBuilder<T> {
new SchemaChangeProcessFunction(
new SchemaManager(table.fileIO(),
table.location())));
schemaChangeProcessFunction.getTransformation().setParallelism(1);
+ schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
BucketingStreamPartitioner<CdcRecord> partitioner =
new BucketingStreamPartitioner<>(new
CdcRecordChannelComputer(table.schema()));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
new file mode 100644
index 000000000..5eab8d1e5
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogUtils;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FailingFileIO;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+/** IT cases for {@link FlinkCdcSyncDatabaseSinkBuilder}. */
+public class FlinkCdcSyncDatabaseSinkITCase extends AbstractTestBase {
+
+ private static final String DATABASE_NAME = "test";
+ private static final String TABLE_NAME = "test_tbl";
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ @Timeout(120)
+ public void testRandomCdcEvents() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ int numTables = random.nextInt(3) + 1;
+ boolean enableFailure = random.nextBoolean();
+
+ int maxEvents = 1500;
+ int maxSchemaChanges = 10;
+ int maxPartitions = 3;
+ int maxKeys = 150;
+ int maxBuckets = 5;
+
+ String failingName = UUID.randomUUID().toString();
+
+ List<TestTable> testTables = new ArrayList<>();
+ List<FileStoreTable> fileStoreTables = new ArrayList<>();
+ for (int i = 0; i < numTables; i++) {
+ String tableName = TABLE_NAME + i;
+ TestTable testTable =
+ new TestTable(
+ tableName,
+ random.nextInt(maxEvents) + 1,
+ random.nextInt(maxSchemaChanges) + 1,
+ random.nextInt(maxPartitions) + 1,
+ random.nextInt(maxKeys) + 1);
+ testTables.add(testTable);
+
+ Path tablePath;
+ FileIO fileIO;
+ if (enableFailure) {
+ tablePath =
+ new Path(
+ FailingFileIO.getFailingPath(
+ failingName,
+ CatalogUtils.stringifyPath(
+ tempDir.toString(),
DATABASE_NAME, tableName)));
+ fileIO = new FailingFileIO();
+ } else {
+ tablePath =
+ new Path(
+ TraceableFileIO.SCHEME
+ + "://"
+ + CatalogUtils.stringifyPath(
+ tempDir.toString(),
DATABASE_NAME, tableName));
+ fileIO = LocalFileIO.create();
+ }
+
+ // no failure when creating table
+ FailingFileIO.reset(failingName, 0, 1);
+
+ FileStoreTable fileStoreTable =
+ createFileStoreTable(
+ tablePath,
+ fileIO,
+ testTable.initialRowType(),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
+ random.nextInt(maxBuckets) + 1);
+ fileStoreTables.add(fileStoreTable);
+ }
+
+ List<TestCdcEvent> events = mergeTestTableEvents(testTables);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getCheckpointConfig().setCheckpointInterval(100);
+ if (!enableFailure) {
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ }
+
+ TestCdcSourceFunction sourceFunction = new
TestCdcSourceFunction(events);
+ DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
+ source.setParallelism(2);
+ new FlinkCdcSyncDatabaseSinkBuilder<TestCdcEvent>()
+ .withInput(source)
+ .withParserFactory(TestCdcEventParser::new)
+ .withTables(fileStoreTables)
+ // because we have at most 3 tables and 8 slots in
AbstractTestBase
+ // each table can only get 2 slots
+ .withParallelism(2)
+ .build();
+
+ // enable failure when running jobs if needed
+ FailingFileIO.reset(failingName, 10, 10000);
+
+ env.execute();
+
+ // no failure when checking results
+ FailingFileIO.reset(failingName, 0, 1);
+
+ for (int i = 0; i < numTables; i++) {
+ FileStoreTable table =
fileStoreTables.get(i).copyWithLatestSchema();
+ SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
+ TableSchema schema = schemaManager.latest().get();
+
+ TableScan.Plan plan = table.newScan().plan();
+ try (RecordReaderIterator<InternalRow> it =
+ new
RecordReaderIterator<>(table.newRead().createReader(plan))) {
+ testTables.get(i).assertResult(schema, it);
+ }
+ }
+ }
+
+ private FileStoreTable createFileStoreTable(
+ Path tablePath,
+ FileIO fileIO,
+ RowType rowType,
+ List<String> partitions,
+ List<String> primaryKeys,
+ int numBucket)
+ throws Exception {
+ Options conf = new Options();
+ conf.set(CoreOptions.BUCKET, numBucket);
+ conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+ conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(fileIO, tablePath),
+ new Schema(rowType.getFields(), partitions,
primaryKeys, conf.toMap(), ""));
+ return FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
+ }
+
+ private List<TestCdcEvent> mergeTestTableEvents(List<TestTable>
testTables) {
+ List<Integer> toShuffle = new ArrayList<>();
+ for (int i = 0; i < testTables.size(); i++) {
+ for (int j = 0; j < testTables.get(i).events().size(); j++) {
+ toShuffle.add(i);
+ }
+ }
+ Collections.shuffle(toShuffle);
+
+ List<TestCdcEvent> events = new ArrayList<>();
+ for (int idx : toShuffle) {
+ events.add(testTables.get(idx).events().poll());
+ }
+ return events;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
similarity index 56%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 7be475e3c..45d6b34e5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -28,16 +28,12 @@ import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.source.TableScan;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.TraceableFileIO;
@@ -49,21 +45,14 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** IT cases for {@link FlinkCdcSink}. */
-public class FlinkCdcSinkITCase extends AbstractTestBase {
+/** IT cases for {@link FlinkCdcSyncTableSinkBuilder}. */
+public class FlinkCdcSyncTableSinkITCase extends AbstractTestBase {
@TempDir java.nio.file.Path tempDir;
@@ -79,63 +68,8 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
int numBucket = random.nextInt(5) + 1;
boolean enableFailure = random.nextBoolean();
- TestCdcEvent[] events = new TestCdcEvent[numEvents];
-
- Set<Integer> schemaChangePositions = new HashSet<>();
- for (int i = 0; i < numSchemaChanges; i++) {
- int pos;
- do {
- pos = random.nextInt(numEvents);
- } while (schemaChangePositions.contains(pos));
- schemaChangePositions.add(pos);
- }
-
- Map<Integer, Map<String, String>> expected = new HashMap<>();
- List<String> fieldNames = new ArrayList<>();
- List<Boolean> isBigInt = new ArrayList<>();
- fieldNames.add("v0");
- isBigInt.add(false);
- int suffixId = 0;
- for (int i = 0; i < numEvents; i++) {
- if (schemaChangePositions.contains(i)) {
- if (random.nextBoolean()) {
- int idx = random.nextInt(fieldNames.size());
- isBigInt.set(idx, true);
- events[i] =
- new TestCdcEvent(
- SchemaChange.updateColumnType(
- fieldNames.get(idx),
DataTypes.BIGINT()));
- } else {
- suffixId++;
- String newName = "v" + suffixId;
- fieldNames.add(newName);
- isBigInt.add(false);
- events[i] = new
TestCdcEvent(SchemaChange.addColumn(newName, DataTypes.INT()));
- }
- } else {
- Map<String, String> fields = new HashMap<>();
- int key = random.nextInt(numKeys);
- fields.put("k", String.valueOf(key));
- int pt = key % numPartitions;
- fields.put("pt", String.valueOf(pt));
- for (int j = 0; j < fieldNames.size(); j++) {
- String fieldName = fieldNames.get(j);
- if (isBigInt.get(j)) {
- fields.put(fieldName,
String.valueOf(random.nextLong()));
- } else {
- fields.put(fieldName,
String.valueOf(random.nextInt()));
- }
- }
-
- List<CdcRecord> records = new ArrayList<>();
- if (expected.containsKey(key)) {
- records.add(new CdcRecord(RowKind.DELETE,
expected.get(key)));
- }
- records.add(new CdcRecord(RowKind.INSERT, fields));
- events[i] = new TestCdcEvent(records);
- expected.put(key, fields);
- }
- }
+ TestTable testTable =
+ new TestTable("test_tbl", numEvents, numSchemaChanges,
numPartitions, numKeys);
Path tablePath;
FileIO fileIO;
@@ -155,9 +89,7 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
createFileStoreTable(
tablePath,
fileIO,
- RowType.of(
- new DataType[] {DataTypes.INT(),
DataTypes.INT(), DataTypes.INT()},
- new String[] {"pt", "k", "v0"}),
+ testTable.initialRowType(),
Collections.singletonList("pt"),
Arrays.asList("pt", "k"),
numBucket);
@@ -168,12 +100,10 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
env.setRestartStrategy(RestartStrategies.noRestart());
}
- TestCdcSourceFunction sourceFunction =
- new TestCdcSourceFunction(
- events, record ->
Integer.valueOf(record.fields().get("k")));
+ TestCdcSourceFunction sourceFunction = new
TestCdcSourceFunction(testTable.events());
DataStreamSource<TestCdcEvent> source = env.addSource(sourceFunction);
source.setParallelism(2);
- new FlinkCdcSinkBuilder<TestCdcEvent>()
+ new FlinkCdcSyncTableSinkBuilder<TestCdcEvent>()
.withInput(source)
.withParserFactory(TestCdcEventParser::new)
.withTable(table)
@@ -192,27 +122,11 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
TableSchema schema = schemaManager.latest().get();
- Map<Integer, Map<String, String>> actual = new HashMap<>();
TableScan.Plan plan = table.newScan().plan();
try (RecordReaderIterator<InternalRow> it =
new
RecordReaderIterator<>(table.newRead().createReader(plan))) {
- while (it.hasNext()) {
- InternalRow row = it.next();
- Map<String, String> fields = new HashMap<>();
- for (int i = 0; i < schema.fieldNames().size(); i++) {
- if (!row.isNullAt(i)) {
- fields.put(
- schema.fieldNames().get(i),
- String.valueOf(
-
schema.fields().get(i).type().equals(DataTypes.BIGINT())
- ? row.getLong(i)
- : row.getInt(i)));
- }
- }
- actual.put(Integer.valueOf(fields.get("k")), fields);
- }
+ testTable.assertResult(schema, it);
}
- assertThat(actual).isEqualTo(expected);
}
private FileStoreTable createFileStoreTable(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
index aedcc0362..f7a933951 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
@@ -28,17 +28,27 @@ public class TestCdcEvent implements Serializable {
private static final long serialVersionUID = 1L;
+ private final String tableName;
private final SchemaChange schemaChange;
private final List<CdcRecord> records;
+ private final int keyHash;
- public TestCdcEvent(SchemaChange schemaChange) {
+ public TestCdcEvent(String tableName, SchemaChange schemaChange) {
+ this.tableName = tableName;
this.schemaChange = schemaChange;
this.records = null;
+ this.keyHash = 0;
}
- public TestCdcEvent(List<CdcRecord> records) {
+ public TestCdcEvent(String tableName, List<CdcRecord> records, int
keyHash) {
+ this.tableName = tableName;
this.schemaChange = null;
this.records = records;
+ this.keyHash = keyHash;
+ }
+
+ public String tableName() {
+ return tableName;
}
public SchemaChange schemaChange() {
@@ -49,8 +59,15 @@ public class TestCdcEvent implements Serializable {
return records;
}
+ @Override
+ public int hashCode() {
+ return keyHash;
+ }
+
@Override
public String toString() {
- return String.format("{schemChange = %s, records = %s}", schemaChange,
records);
+ return String.format(
+ "{tableName = %s, schemChange = %s, records = %s}",
+ tableName, schemaChange, records);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
index bb544d24c..9772d3292 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
@@ -33,6 +33,11 @@ public class TestCdcEventParser implements
EventParser<TestCdcEvent> {
this.raw = raw;
}
+ @Override
+ public String tableName() {
+ return raw.tableName();
+ }
+
@Override
public boolean isSchemaChange() {
return raw.schemaChange() != null;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
index 889c8216d..507fa033d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
@@ -18,9 +18,6 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.SerializableFunction;
-
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -28,11 +25,10 @@ import
org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import java.util.Arrays;
+import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
/**
* Testing {@link RichParallelSourceFunction} to produce {@link TestCdcEvent}.
{@link TestCdcEvent}s
@@ -44,17 +40,14 @@ public class TestCdcSourceFunction extends
RichParallelSourceFunction<TestCdcEve
private static final long serialVersionUID = 1L;
private final LinkedList<TestCdcEvent> events;
- private final SerializableFunction<CdcRecord, Integer> getKeyHash;
private volatile boolean isRunning = true;
private transient int numRecordsPerCheckpoint;
private transient AtomicInteger recordsThisCheckpoint;
private transient ListState<Integer> remainingEventsCount;
- public TestCdcSourceFunction(
- TestCdcEvent[] events, SerializableFunction<CdcRecord, Integer>
getKeyHash) {
- this.events =
Arrays.stream(events).collect(Collectors.toCollection(LinkedList::new));
- this.getKeyHash = getKeyHash;
+ public TestCdcSourceFunction(Collection<TestCdcEvent> events) {
+ this.events = new LinkedList<>(events);
}
@Override
@@ -95,18 +88,9 @@ public class TestCdcSourceFunction extends
RichParallelSourceFunction<TestCdcEve
synchronized (ctx.getCheckpointLock()) {
TestCdcEvent event = events.poll();
if (event.records() != null) {
- for (int i = 0; i + 1 < event.records().size(); i++) {
- Preconditions.checkArgument(
- getKeyHash
- .apply(event.records().get(i))
-
.equals(getKeyHash.apply(event.records().get(i + 1))),
- "Key hashes in the same List<Record> are not
equal."
- + "This is an invalid test data.");
- }
- int hash = getKeyHash.apply(event.records().get(0));
int subtaskId =
getRuntimeContext().getIndexOfThisSubtask();
int totalSubtasks =
getRuntimeContext().getNumberOfParallelSubtasks();
- if (Math.abs(hash) % totalSubtasks != subtaskId) {
+ if (Math.abs(event.hashCode()) % totalSubtasks !=
subtaskId) {
continue;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
new file mode 100644
index 000000000..f5be3dca3
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Generate test data for {@link FlinkCdcSyncTableSinkITCase} and {@link
+ * FlinkCdcSyncDatabaseSinkITCase}.
+ */
+public class TestTable {
+
+ private final RowType initialRowType;
+
+ private final Queue<TestCdcEvent> events;
+ private final Map<Integer, Map<String, String>> expected;
+
+ public TestTable(
+ String tableName, int numEvents, int numSchemaChanges, int
numPartitions, int numKeys) {
+ List<String> fieldNames = new ArrayList<>();
+ List<Boolean> isBigInt = new ArrayList<>();
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int numFields = random.nextInt(5) + 1;
+
+ DataType[] initialFieldTypes = new DataType[2 + numFields];
+ initialFieldTypes[0] = DataTypes.INT();
+ initialFieldTypes[1] = DataTypes.INT();
+
+ String[] initialFieldNames = new String[2 + numFields];
+ initialFieldNames[0] = "pt";
+ initialFieldNames[1] = "k";
+
+ for (int i = 0; i < numFields; i++) {
+ fieldNames.add("v" + i);
+ initialFieldNames[2 + i] = "v" + i;
+
+ if (random.nextBoolean()) {
+ isBigInt.add(true);
+ initialFieldTypes[2 + i] = DataTypes.BIGINT();
+ } else {
+ isBigInt.add(false);
+ initialFieldTypes[2 + i] = DataTypes.INT();
+ }
+ }
+
+ initialRowType = RowType.of(initialFieldTypes, initialFieldNames);
+
+ Set<Integer> schemaChangePositions = new HashSet<>();
+
+ // if numSchemaChanges is larger than numEvents, the following loop
will never end
+ // we can, of course, use fisher's algorithm or such, but let's make
things simple for tests
+ numSchemaChanges = Math.min(numSchemaChanges, numEvents / 2);
+ for (int i = 0; i < numSchemaChanges; i++) {
+ int pos;
+ do {
+ pos = random.nextInt(numEvents);
+ } while (schemaChangePositions.contains(pos));
+ schemaChangePositions.add(pos);
+ }
+
+ events = new LinkedList<>();
+ expected = new HashMap<>();
+ for (int i = 0; i < numEvents; i++) {
+ if (schemaChangePositions.contains(i)) {
+ if (random.nextBoolean()) {
+ int idx = random.nextInt(fieldNames.size());
+ isBigInt.set(idx, true);
+ events.add(
+ new TestCdcEvent(
+ tableName,
+ SchemaChange.updateColumnType(
+ fieldNames.get(idx),
DataTypes.BIGINT())));
+ } else {
+ String newName = "v" + fieldNames.size();
+ fieldNames.add(newName);
+ isBigInt.add(false);
+ events.add(
+ new TestCdcEvent(
+ tableName, SchemaChange.addColumn(newName,
DataTypes.INT())));
+ }
+ } else {
+ Map<String, String> fields = new HashMap<>();
+ int key = random.nextInt(numKeys);
+ fields.put("k", String.valueOf(key));
+ int pt = key % numPartitions;
+ fields.put("pt", String.valueOf(pt));
+
+ for (int j = 0; j < fieldNames.size(); j++) {
+ String fieldName = fieldNames.get(j);
+ if (isBigInt.get(j)) {
+ fields.put(fieldName,
String.valueOf(random.nextLong()));
+ } else {
+ fields.put(fieldName,
String.valueOf(random.nextInt()));
+ }
+ }
+
+ List<CdcRecord> records = new ArrayList<>();
+ if (expected.containsKey(key)) {
+ records.add(new CdcRecord(RowKind.DELETE,
expected.get(key)));
+ }
+ records.add(new CdcRecord(RowKind.INSERT, fields));
+ events.add(new TestCdcEvent(tableName, records,
Objects.hash(tableName, key)));
+ expected.put(key, fields);
+ }
+ }
+ }
+
+ public RowType initialRowType() {
+ return initialRowType;
+ }
+
+ public Queue<TestCdcEvent> events() {
+ return events;
+ }
+
+ public void assertResult(TableSchema schema, Iterator<InternalRow> it) {
+ Map<Integer, Map<String, String>> actual = new HashMap<>();
+ while (it.hasNext()) {
+ InternalRow row = it.next();
+ Map<String, String> fields = new HashMap<>();
+ for (int i = 0; i < schema.fieldNames().size(); i++) {
+ if (!row.isNullAt(i)) {
+ fields.put(
+ schema.fieldNames().get(i),
+ String.valueOf(
+
schema.fields().get(i).type().equals(DataTypes.BIGINT())
+ ? row.getLong(i)
+ : row.getInt(i)));
+ }
+ }
+ actual.put(Integer.valueOf(fields.get("k")), fields);
+ }
+ assertThat(actual).isEqualTo(expected);
+ }
+}