This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new aca3fd592 [flink][cdc] Fix that setting both table affix and including
pattern causes table cannot be synchronized in kafka cdc (#2047)
aca3fd592 is described below
commit aca3fd592db147ed88b0520da628cb24c3372522
Author: yuzelin <[email protected]>
AuthorDate: Mon Sep 25 18:24:35 2023 +0800
[flink][cdc] Fix that setting both table affix and including pattern causes
table cannot be synchronized in kafka cdc (#2047)
---
paimon-flink/paimon-flink-cdc/pom.xml | 8 ++++
.../paimon/flink/action/cdc/kafka/KafkaSchema.java | 5 +-
.../action/cdc/kafka/KafkaSyncDatabaseAction.java | 12 +++--
.../action/cdc/kafka/KafkaSyncTableAction.java | 9 +---
.../flink/action/cdc/kafka/formats/DataFormat.java | 9 +---
.../action/cdc/kafka/formats/RecordParser.java | 12 ++---
.../cdc/kafka/formats/RecordParserFactory.java | 7 +--
.../cdc/kafka/formats/canal/CanalRecordParser.java | 8 +---
.../kafka/formats/maxwell/MaxwellRecordParser.java | 8 +---
.../cdc/kafka/formats/ogg/OggRecordParser.java | 8 +---
.../action/cdc/mongodb/MongoDBRecordParser.java | 13 ++----
.../cdc/mongodb/MongoDBSyncDatabaseAction.java | 15 +++---
.../action/cdc/mongodb/MongoDBSyncTableAction.java | 4 +-
.../flink/sink/cdc/CdcDynamicBucketSink.java | 0
.../sink/cdc/CdcDynamicBucketWriteOperator.java | 0
.../cdc/CdcDynamicTableParsingProcessFunction.java | 0
.../flink/sink/cdc/CdcHashKeyChannelComputer.java | 0
.../cdc/CdcMultiTableParsingProcessFunction.java | 0
.../paimon/flink/sink/cdc/CdcMultiplexRecord.java | 0
.../cdc/CdcMultiplexRecordChannelComputer.java | 0
.../flink/sink/cdc/CdcParsingProcessFunction.java | 0
.../apache/paimon/flink/sink/cdc/CdcRecord.java | 0
.../flink/sink/cdc/CdcRecordChannelComputer.java | 0
.../sink/cdc/CdcRecordKeyAndBucketExtractor.java | 0
.../sink/cdc/CdcRecordPartitionKeyExtractor.java | 0
.../sink/cdc/CdcRecordStoreMultiWriteOperator.java | 0
.../sink/cdc/CdcRecordStoreWriteOperator.java | 0
.../paimon/flink/sink/cdc/CdcRecordUtils.java | 0
.../paimon/flink/sink/cdc/CdcSinkBuilder.java | 0
.../sink/cdc/CdcWithBucketChannelComputer.java | 0
.../apache/paimon/flink/sink/cdc/EventParser.java | 0
.../flink/sink/cdc/FlinkCdcMultiTableSink.java | 0
.../apache/paimon/flink/sink/cdc/FlinkCdcSink.java | 0
.../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 0
...MultiTableUpdatedDataFieldsProcessFunction.java | 0
.../flink/sink/cdc/NewTableSchemaBuilder.java | 0
.../flink/sink/cdc/RichCdcMultiplexRecord.java | 0
.../cdc/RichCdcMultiplexRecordEventParser.java | 12 +++--
.../cdc/RichCdcMultiplexRecordSchemaBuilder.java | 0
.../paimon/flink/sink/cdc/RichCdcRecord.java | 0
.../paimon/flink/sink/cdc/RichCdcSinkBuilder.java | 0
.../paimon/flink/sink/cdc/RichEventParser.java | 0
.../sink/cdc/UpdatedDataFieldsProcessFunction.java | 0
.../cdc/UpdatedDataFieldsProcessFunctionBase.java | 0
.../kafka/KafkaCanalSyncDatabaseActionITCase.java | 5 ++
.../KafkaMaxwellSyncDatabaseActionITCase.java | 5 ++
.../kafka/KafkaOggSyncDatabaseActionITCase.java | 5 ++
.../mongodb/MongoDBSyncDatabaseActionITCase.java | 54 +++++++++++++++++++---
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 2 +
.../cdc/CdcMultiplexRecordChannelComputerTest.java | 0
.../sink/cdc/CdcRecordChannelComputerTest.java | 0
.../cdc/CdcRecordKeyAndBucketExtractorTest.java | 0
.../cdc/CdcRecordStoreMultiWriteOperatorTest.java | 0
.../sink/cdc/CdcRecordStoreWriteOperatorTest.java | 0
.../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java | 0
.../sink/cdc/FlinkCdcSyncTableSinkITCase.java | 0
.../apache/paimon/flink/sink/cdc/TestCdcEvent.java | 0
.../paimon/flink/sink/cdc/TestCdcEventParser.java | 0
.../flink/sink/cdc/TestCdcSourceFunction.java | 0
.../apache/paimon/flink/sink/cdc/TestTable.java | 0
paimon-flink/paimon-flink-common/pom.xml | 8 ----
.../sink/MultiTablesStoreCompactOperator.java | 3 +-
62 files changed, 115 insertions(+), 97 deletions(-)
diff --git a/paimon-flink/paimon-flink-cdc/pom.xml
b/paimon-flink/paimon-flink-cdc/pom.xml
index d8e1591d2..3054993b3 100644
--- a/paimon-flink/paimon-flink-cdc/pom.xml
+++ b/paimon-flink/paimon-flink-cdc/pom.xml
@@ -202,6 +202,14 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
index 58ff74d84..a01ce3de6 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.action.cdc.kafka;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
@@ -132,9 +131,7 @@ public class KafkaSchema {
int retryInterval = 1000;
DataFormat format = getDataFormat(kafkaConfig);
- RecordParser recordParser =
- format.createParser(
- true, new TableNameConverter(true), typeMapping,
Collections.emptyList());
+ RecordParser recordParser = format.createParser(true, typeMapping,
Collections.emptyList());
while (true) {
ConsumerRecords<String, String> consumerRecords =
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 3ad479a68..61aa45108 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -148,24 +148,26 @@ public class KafkaSyncDatabaseAction extends ActionBase {
}
catalog.createDatabase(database, true);
- TableNameConverter tableNameConverter =
- new TableNameConverter(caseSensitive, true, tablePrefix,
tableSuffix);
KafkaSource<String> source =
KafkaActionUtils.buildKafkaSource(kafkaConfig);
DataFormat format = DataFormat.getDataFormat(kafkaConfig);
RecordParser recordParser =
- format.createParser(
- caseSensitive, tableNameConverter, typeMapping,
Collections.emptyList());
+ format.createParser(caseSensitive, typeMapping,
Collections.emptyList());
RichCdcMultiplexRecordSchemaBuilder schemaBuilder =
new RichCdcMultiplexRecordSchemaBuilder(tableConfig,
caseSensitive);
Pattern includingPattern = Pattern.compile(includingTables);
Pattern excludingPattern =
excludingTables == null ? null :
Pattern.compile(excludingTables);
+ TableNameConverter tableNameConverter =
+ new TableNameConverter(caseSensitive, true, tablePrefix,
tableSuffix);
EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
() ->
new RichCdcMultiplexRecordEventParser(
- schemaBuilder, includingPattern,
excludingPattern);
+ schemaBuilder,
+ includingPattern,
+ excludingPattern,
+ tableNameConverter);
new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
.withInput(
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index 0b362d4fb..2804b04e0 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -26,7 +26,6 @@ import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
@@ -171,13 +170,9 @@ public class KafkaSyncTableAction extends ActionBase {
}
DataFormat format = DataFormat.getDataFormat(kafkaConfig);
RecordParser recordParser =
- format.createParser(
- caseSensitive,
- new TableNameConverter(caseSensitive),
- typeMapping,
- computedColumns);
+ format.createParser(caseSensitive, typeMapping,
computedColumns);
EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
- RichCdcMultiplexRecordEventParser::new;
+ () -> new RichCdcMultiplexRecordEventParser(caseSensitive);
CdcSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
new CdcSinkBuilder<RichCdcMultiplexRecord>()
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
index fda6311f2..71c4af8f4 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.kafka.formats;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import
org.apache.paimon.flink.action.cdc.kafka.formats.canal.CanalRecordParser;
import
org.apache.paimon.flink.action.cdc.kafka.formats.maxwell.MaxwellRecordParser;
@@ -54,16 +53,12 @@ public enum DataFormat {
* configurations.
*
* @param caseSensitive Indicates whether the parser should be
case-sensitive.
- * @param tableNameConverter Converter to transform table names.
* @param computedColumns List of computed columns to be considered by the
parser.
* @return A new instance of {@link RecordParser}.
*/
public RecordParser createParser(
- boolean caseSensitive,
- TableNameConverter tableNameConverter,
- TypeMapping typeMapping,
- List<ComputedColumn> computedColumns) {
- return parser.createParser(caseSensitive, typeMapping,
tableNameConverter, computedColumns);
+ boolean caseSensitive, TypeMapping typeMapping,
List<ComputedColumn> computedColumns) {
+ return parser.createParser(caseSensitive, typeMapping,
computedColumns);
}
/**
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
index a31df4811..5e3d0f886 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.kafka.formats;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
@@ -64,7 +63,6 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
protected static final String FIELD_TABLE = "table";
protected static final String FIELD_DATABASE = "database";
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- protected final TableNameConverter tableNameConverter;
protected final boolean caseSensitive;
protected final TypeMapping typeMapping;
protected final List<ComputedColumn> computedColumns;
@@ -77,13 +75,9 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
protected String tableName;
public RecordParser(
- boolean caseSensitive,
- TypeMapping typeMapping,
- TableNameConverter tableNameConverter,
- List<ComputedColumn> computedColumns) {
+ boolean caseSensitive, TypeMapping typeMapping,
List<ComputedColumn> computedColumns) {
this.caseSensitive = caseSensitive;
this.typeMapping = typeMapping;
- this.tableNameConverter = tableNameConverter;
this.computedColumns = computedColumns;
}
@@ -93,7 +87,7 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
return null;
}
databaseName = extractStringFromRootJson(FIELD_DATABASE);
- tableName =
tableNameConverter.convert(extractStringFromRootJson(FIELD_TABLE));
+ tableName = extractStringFromRootJson(FIELD_TABLE);
this.setPrimaryField();
this.setDataField();
this.validateFormat();
@@ -148,7 +142,7 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
validateFormat();
databaseName = extractStringFromRootJson(FIELD_DATABASE);
- tableName =
tableNameConverter.convert(extractStringFromRootJson(FIELD_TABLE));
+ tableName = extractStringFromRootJson(FIELD_TABLE);
extractRecords().forEach(out::collect);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
index 413ecbc4f..a47bca23f 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.kafka.formats;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import java.util.List;
@@ -40,13 +39,9 @@ public interface RecordParserFactory {
*
* @param caseSensitive Indicates whether the parser should be
case-sensitive.
* @param typeMapping Data type mapping options.
- * @param tableNameConverter Converter to transform table names.
* @param computedColumns List of computed columns to be considered by the
parser.
* @return A new instance of {@link RecordParser}.
*/
RecordParser createParser(
- boolean caseSensitive,
- TypeMapping typeMapping,
- TableNameConverter tableNameConverter,
- List<ComputedColumn> computedColumns);
+ boolean caseSensitive, TypeMapping typeMapping,
List<ComputedColumn> computedColumns);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
index 42ac8c948..e0f2ec748 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.kafka.formats.canal;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
@@ -73,11 +72,8 @@ public class CanalRecordParser extends RecordParser {
}
public CanalRecordParser(
- boolean caseSensitive,
- TypeMapping typeMapping,
- TableNameConverter tableNameConverter,
- List<ComputedColumn> computedColumns) {
- super(caseSensitive, typeMapping, tableNameConverter, computedColumns);
+ boolean caseSensitive, TypeMapping typeMapping,
List<ComputedColumn> computedColumns) {
+ super(caseSensitive, typeMapping, computedColumns);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
index 0a2df9fcd..8b51edad9 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.kafka.formats.maxwell;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
@@ -54,11 +53,8 @@ public class MaxwellRecordParser extends RecordParser {
private static final String OP_DELETE = "delete";
public MaxwellRecordParser(
- boolean caseSensitive,
- TypeMapping typeMapping,
- TableNameConverter tableNameConverter,
- List<ComputedColumn> computedColumns) {
- super(caseSensitive, typeMapping, tableNameConverter, computedColumns);
+ boolean caseSensitive, TypeMapping typeMapping,
List<ComputedColumn> computedColumns) {
+ super(caseSensitive, typeMapping, computedColumns);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
index 24e5fe4b1..7f4fcc76a 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.kafka.formats.ogg;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
@@ -57,11 +56,8 @@ public class OggRecordParser extends RecordParser {
private static final String OP_DELETE = "D";
public OggRecordParser(
- boolean caseSensitive,
- TypeMapping typeMapping,
- TableNameConverter tableNameConverter,
- List<ComputedColumn> computedColumns) {
- super(caseSensitive, typeMapping, tableNameConverter, computedColumns);
+ boolean caseSensitive, TypeMapping typeMapping,
List<ComputedColumn> computedColumns) {
+ super(caseSensitive, typeMapping, computedColumns);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
index 3f2c33f8e..31ce83df1 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.action.cdc.mongodb;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
import
org.apache.paimon.flink.action.cdc.mongodb.strategy.Mongo4VersionStrategy;
import
org.apache.paimon.flink.action.cdc.mongodb.strategy.MongoVersionStrategy;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
@@ -61,24 +60,18 @@ public class MongoDBRecordParser implements
FlatMapFunction<String, RichCdcMulti
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final List<ComputedColumn> computedColumns;
private final boolean caseSensitive;
- private final TableNameConverter tableNameConverter;
private final Configuration mongodbConfig;
private JsonNode root;
- public MongoDBRecordParser(
- boolean caseSensitive,
- TableNameConverter tableNameConverter,
- Configuration mongodbConfig) {
- this(caseSensitive, tableNameConverter, Collections.emptyList(),
mongodbConfig);
+ public MongoDBRecordParser(boolean caseSensitive, Configuration
mongodbConfig) {
+ this(caseSensitive, Collections.emptyList(), mongodbConfig);
}
public MongoDBRecordParser(
boolean caseSensitive,
- TableNameConverter tableNameConverter,
List<ComputedColumn> computedColumns,
Configuration mongodbConfig) {
this.caseSensitive = caseSensitive;
- this.tableNameConverter = tableNameConverter;
this.computedColumns = computedColumns;
this.mongodbConfig = mongodbConfig;
}
@@ -87,7 +80,7 @@ public class MongoDBRecordParser implements
FlatMapFunction<String, RichCdcMulti
public void flatMap(String value, Collector<RichCdcMultiplexRecord> out)
throws Exception {
root = OBJECT_MAPPER.readValue(value, JsonNode.class);
String databaseName = extractString(FIELD_DATABASE);
- String collection =
tableNameConverter.convert(extractString(FIELD_TABLE));
+ String collection = extractString(FIELD_TABLE);
MongoVersionStrategy versionStrategy =
VersionStrategyFactory.create(
databaseName, collection, caseSensitive,
computedColumns, mongodbConfig);
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index ea4d7a850..b407e22a5 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -125,10 +125,8 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
}
catalog.createDatabase(database, true);
- TableNameConverter tableNameConverter =
- new TableNameConverter(caseSensitive, true, tablePrefix,
tableSuffix);
- List<Identifier> excludedTables = new ArrayList<>();
+ List<Identifier> excludedTables = new ArrayList<>();
MongoDBSource<String> source =
MongoDBActionUtils.buildMongodbSource(
mongodbConfig,
@@ -143,16 +141,19 @@ public class MongoDBSyncDatabaseAction extends ActionBase
{
Pattern includingPattern = Pattern.compile(this.includingTables);
Pattern excludingPattern =
excludingTables == null ? null :
Pattern.compile(excludingTables);
+ TableNameConverter tableNameConverter =
+ new TableNameConverter(caseSensitive, true, tablePrefix,
tableSuffix);
parserFactory =
() ->
new RichCdcMultiplexRecordEventParser(
- schemaBuilder, includingPattern,
excludingPattern);
+ schemaBuilder,
+ includingPattern,
+ excludingPattern,
+ tableNameConverter);
new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
.withInput(
env.fromSource(source,
WatermarkStrategy.noWatermarks(), "MongoDB Source")
- .flatMap(
- new MongoDBRecordParser(
- caseSensitive,
tableNameConverter, mongodbConfig)))
+ .flatMap(new
MongoDBRecordParser(caseSensitive, mongodbConfig)))
.withParserFactory(parserFactory)
.withCatalogLoader(catalogLoader())
.withDatabase(database)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index 1c6b78ae3..3b25c2b69 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -23,7 +23,6 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
@@ -150,7 +149,7 @@ public class MongoDBSyncTableAction extends ActionBase {
}
EventParser.Factory<RichCdcMultiplexRecord> parserFactory =
- RichCdcMultiplexRecordEventParser::new;
+ () -> new RichCdcMultiplexRecordEventParser(caseSensitive);
CdcSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
new CdcSinkBuilder<RichCdcMultiplexRecord>()
@@ -162,7 +161,6 @@ public class MongoDBSyncTableAction extends ActionBase {
.flatMap(
new MongoDBRecordParser(
caseSensitive,
- new
TableNameConverter(caseSensitive),
computedColumns,
mongodbConfig)))
.withParserFactory(parserFactory)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcHashKeyChannelComputer.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecord.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecord.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecord.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecord.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordPartitionKeyExtractor.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordPartitionKeyExtractor.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordPartitionKeyExtractor.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordPartitionKeyExtractor.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
similarity index 90%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 5153664ad..0e19f8c88 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink.cdc;
+import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
@@ -44,6 +45,7 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
private final NewTableSchemaBuilder<RichCdcMultiplexRecord> schemaBuilder;
@Nullable private final Pattern includingPattern;
@Nullable private final Pattern excludingPattern;
+ private final TableNameConverter tableNameConverter;
private final Map<String, RichEventParser> parsers = new HashMap<>();
private final Set<String> includedTables = new HashSet<>();
private final Set<String> excludedTables = new HashSet<>();
@@ -54,17 +56,19 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
private boolean shouldSynchronizeCurrentTable;
private RichEventParser currentParser;
- public RichCdcMultiplexRecordEventParser() {
- this(record -> Optional.empty(), null, null);
+ public RichCdcMultiplexRecordEventParser(boolean caseSensitive) {
+ this(record -> Optional.empty(), null, null, new
TableNameConverter(caseSensitive));
}
public RichCdcMultiplexRecordEventParser(
NewTableSchemaBuilder<RichCdcMultiplexRecord> schemaBuilder,
@Nullable Pattern includingPattern,
- @Nullable Pattern excludingPattern) {
+ @Nullable Pattern excludingPattern,
+ TableNameConverter tableNameConverter) {
this.schemaBuilder = schemaBuilder;
this.includingPattern = includingPattern;
this.excludingPattern = excludingPattern;
+ this.tableNameConverter = tableNameConverter;
}
@Override
@@ -80,7 +84,7 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
@Override
public String parseTableName() {
- return currentTable;
+ return tableNameConverter.convert(currentTable);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index d7a2bbc8f..62253a6c8 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -35,6 +35,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
import static org.assertj.core.api.Assertions.assertThat;
@@ -296,6 +297,8 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
.withTablePrefix("test_prefix_")
.withTableSuffix("_test_suffix")
.withTableConfig(getBasicTableConfig())
+ // test including check with affix
+
.includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*")
.build();
runActionWithDefaultEnv(action);
@@ -345,6 +348,8 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
.withTablePrefix("test_prefix_")
.withTableSuffix("_test_suffix")
.withTableConfig(getBasicTableConfig())
+ // test including check with affix
+
.includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*")
.build();
runActionWithDefaultEnv(action);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
index 5a38bd542..645a0a9c9 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaMaxwellSyncDatabaseActionITCase.java
@@ -34,6 +34,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -263,6 +264,8 @@ public class KafkaMaxwellSyncDatabaseActionITCase extends
KafkaActionITCaseBase
.withTablePrefix("test_prefix_")
.withTableSuffix("_test_suffix")
.withTableConfig(getBasicTableConfig())
+ // test including check with affix
+
.includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*")
.build();
runActionWithDefaultEnv(action);
@@ -317,6 +320,8 @@ public class KafkaMaxwellSyncDatabaseActionITCase extends
KafkaActionITCaseBase
.withTablePrefix("test_prefix_")
.withTableSuffix("_test_suffix")
.withTableConfig(getBasicTableConfig())
+ // test including check with affix
+
.includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*")
.build();
runActionWithDefaultEnv(action);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
index 538b109a2..b452aa443 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java
@@ -35,6 +35,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -261,6 +262,8 @@ public class KafkaOggSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
.withTablePrefix("TEST_PREFIX_")
.withTableSuffix("_TEST_SUFFIX")
.withTableConfig(getBasicTableConfig())
+ // test including check with affix
+
.includingTables(ThreadLocalRandom.current().nextBoolean() ? "T1|T2" : ".*")
.build();
runActionWithDefaultEnv(action);
@@ -312,6 +315,8 @@ public class KafkaOggSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
.withTablePrefix("TEST_PREFIX_")
.withTableSuffix("_TEST_SUFFIX")
.withTableConfig(getBasicTableConfig())
+ // test including check with affix
+
.includingTables(ThreadLocalRandom.current().nextBoolean() ? "T1|T2" : ".*")
.build();
runActionWithDefaultEnv(action);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
index 0b55ce8ab..1e736534f 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
@@ -32,6 +32,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThat;
@@ -52,14 +53,15 @@ public class MongoDBSyncDatabaseActionITCase extends
MongoDBActionITCaseBase {
.build();
runActionWithDefaultEnv(action);
- testSchemaEvolutionImpl();
+ testSchemaEvolutionImpl("t1", "t2", database);
}
- private void testSchemaEvolutionImpl() throws Exception {
- waitingTables("t1", "t2");
+ private void testSchemaEvolutionImpl(String table1Name, String table2Name,
String dbName)
+ throws Exception {
+ waitingTables(table1Name, table2Name);
- FileStoreTable table1 = getFileStoreTable("t1");
- FileStoreTable table2 = getFileStoreTable("t2");
+ FileStoreTable table1 = getFileStoreTable(table1Name);
+ FileStoreTable table2 = getFileStoreTable(table2Name);
RowType rowType1 =
RowType.of(
@@ -95,7 +97,7 @@ public class MongoDBSyncDatabaseActionITCase extends
MongoDBActionITCaseBase {
"+I[100000000000000000000103, user_3, Hangzhou,
1235567891234]");
waitForResult(expected, table2, rowType2, primaryKeys2);
- writeRecordsToMongoDB("test-data-3", database, "database");
+ writeRecordsToMongoDB("test-data-3", dbName, "database");
expected =
Arrays.asList(
@@ -104,7 +106,7 @@ public class MongoDBSyncDatabaseActionITCase extends
MongoDBActionITCaseBase {
"+I[100000000000000000000103, 12-pack drill bits, Set
of 12 professional-grade drill bits, 0.8]");
waitForResult(expected, table1, rowType1, primaryKeys1);
- writeRecordsToMongoDB("test-data-4", database, "database");
+ writeRecordsToMongoDB("test-data-4", dbName, "database");
expected =
Arrays.asList(
@@ -237,4 +239,42 @@ public class MongoDBSyncDatabaseActionITCase extends
MongoDBActionITCaseBase {
"+I[610000000000000000000103,
youtube#videoListResponse,
\\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\",
{\"pagehit\":{\"kind\":\"youtube#video\"},\"totalResults\":1,\"resultsPerPage\":1},
[{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":
[...]
waitForResult(expected2, table2, rowType2, primaryKeys2);
}
+
+ @Test
+ @Timeout(60)
+ public void testTableAffix() throws Exception {
+ // create table t1
+ createFileStoreTable(
+ "test_prefix_t1_test_suffix",
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"_id", "name", "description", "weight"}),
+ Collections.emptyList(),
+ Collections.singletonList("_id"),
+ Collections.emptyMap());
+
+ // try synchronization
+ String dbName = database + UUID.randomUUID();
+ writeRecordsToMongoDB("test-data-1", dbName, "database");
+ writeRecordsToMongoDB("test-data-2", dbName, "database");
+
+ Map<String, String> mongodbConfig = getBasicMongoDBConfig();
+ mongodbConfig.put("database", dbName);
+ MongoDBSyncDatabaseAction action =
+ syncDatabaseActionBuilder(mongodbConfig)
+ .withTableConfig(getBasicTableConfig())
+ .withTablePrefix("test_prefix_")
+ .withTableSuffix("_test_suffix")
+ // test including check with affix
+
.includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*")
+ .build();
+ runActionWithDefaultEnv(action);
+
+ testSchemaEvolutionImpl("test_prefix_t1_test_suffix",
"test_prefix_t2_test_suffix", dbName);
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 8990a73b7..265c4ea37 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -298,6 +298,8 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
.withTableConfig(getBasicTableConfig())
.withTablePrefix("test_prefix_")
.withTableSuffix("_test_suffix")
+ // test including check with affix
+
.includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*")
.build();
runActionWithDefaultEnv(action);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
diff --git a/paimon-flink/paimon-flink-common/pom.xml
b/paimon-flink/paimon-flink-common/pom.xml
index 8e6f0e840..9bea87fcc 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -162,14 +162,6 @@ under the License.
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 8ce822bc2..15948d6d5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -45,7 +45,6 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
-import static
org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
/**
@@ -221,7 +220,7 @@ public class MultiTablesStoreCompactOperator
// table not found, waiting until table is created by
// upstream operators
}
- Thread.sleep(RETRY_SLEEP_TIME.defaultValue().toMillis());
+ Thread.sleep(500);
}
}
return table;