This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ba587ecd4 [cdc] Database sync supports specify partition keys and
primary keys (#3559)
ba587ecd4 is described below
commit ba587ecd4767d377adbc0dada05f729b1169e4b5
Author: yuzelin <[email protected]>
AuthorDate: Fri Jun 21 15:50:11 2024 +0800
[cdc] Database sync supports specify partition keys and primary keys (#3559)
---
docs/content/flink/cdc-ingestion/kafka-cdc.md | 4 +-
docs/content/flink/cdc-ingestion/mongo-cdc.md | 2 +
docs/content/flink/cdc-ingestion/mysql-cdc.md | 2 +
docs/content/flink/cdc-ingestion/pulsar-cdc.md | 2 +
.../shortcodes/generated/kafka_sync_database.html | 12 +++
.../generated/mongodb_sync_database.html | 11 +++
.../shortcodes/generated/mysql_sync_database.html | 11 +++
.../shortcodes/generated/pulsar_sync_database.html | 11 +++
.../flink/action/cdc/CdcActionCommonUtils.java | 105 ++++++++++++++++-----
.../cdc/MessageQueueSyncTableActionBase.java | 1 +
.../flink/action/cdc/SyncDatabaseActionBase.java | 16 +++-
.../action/cdc/SyncDatabaseActionFactoryBase.java | 13 ++-
.../flink/action/cdc/SyncTableActionBase.java | 1 +
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 5 +-
.../flink/sink/cdc/NewTableSchemaBuilder.java | 56 ++++++-----
.../flink/action/cdc/CdcActionITCaseBase.java | 14 +++
.../kafka/KafkaCanalSyncDatabaseActionITCase.java | 56 +++++++++++
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 73 ++++++++++++++
.../cdc/mysql/MySqlSyncTableActionITCase.java | 9 +-
.../postgres/PostgresSyncTableActionITCase.java | 9 +-
.../canal/database/specify-keys/canal-data-1.txt | 20 ++++
.../test/resources/mysql/sync_database_setup.sql | 20 ++++
22 files changed, 386 insertions(+), 67 deletions(-)
diff --git a/docs/content/flink/cdc-ingestion/kafka-cdc.md
b/docs/content/flink/cdc-ingestion/kafka-cdc.md
index 9caf53119..260e33d8b 100644
--- a/docs/content/flink/cdc-ingestion/kafka-cdc.md
+++ b/docs/content/flink/cdc-ingestion/kafka-cdc.md
@@ -199,6 +199,8 @@ To use this feature through `flink run`, run the following
shell command.
[--including_tables <table-name|name-regular-expr>] \
[--excluding_tables <table-name|name-regular-expr>] \
[--type_mapping to-string] \
+ [--partition_keys <partition_keys>] \
+ [--primary_keys <primary-keys>] \
[--kafka_conf <kafka-source-conf> [--kafka_conf <kafka-source-conf> ...]] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf
<paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-sink-conf> [--table_conf
<paimon-table-sink-conf> ...]]
@@ -206,8 +208,6 @@ To use this feature through `flink run`, run the following
shell command.
{{< generated/kafka_sync_database >}}
-Only tables with primary keys will be synchronized.
-
This action will build a single combined sink for all tables. For each Kafka
topic's table to be synchronized, if the
corresponding Paimon table does not exist, this action will automatically
create the table, and its schema will be derived
from all specified Kafka topic's tables. If the Paimon table already exists
and its schema is different from that parsed
diff --git a/docs/content/flink/cdc-ingestion/mongo-cdc.md
b/docs/content/flink/cdc-ingestion/mongo-cdc.md
index 17c70ba40..a73f746c6 100644
--- a/docs/content/flink/cdc-ingestion/mongo-cdc.md
+++ b/docs/content/flink/cdc-ingestion/mongo-cdc.md
@@ -194,6 +194,8 @@ To use this feature through `flink run`, run the following
shell command.
[--table_suffix <paimon-table-suffix>] \
[--including_tables <mongodb-table-name|name-regular-expr>] \
[--excluding_tables <mongodb-table-name|name-regular-expr>] \
+ [--partition_keys <partition_keys>] \
+ [--primary_keys <primary-keys>] \
[--mongodb_conf <mongodb-cdc-source-conf> [--mongodb_conf
<mongodb-cdc-source-conf> ...]] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf
<paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-sink-conf> [--table_conf
<paimon-table-sink-conf> ...]]
diff --git a/docs/content/flink/cdc-ingestion/mysql-cdc.md
b/docs/content/flink/cdc-ingestion/mysql-cdc.md
index 2bfc38c35..f12a268a1 100644
--- a/docs/content/flink/cdc-ingestion/mysql-cdc.md
+++ b/docs/content/flink/cdc-ingestion/mysql-cdc.md
@@ -137,6 +137,8 @@ To use this feature through `flink run`, run the following
shell command.
[--mode <sync-mode>] \
[--metadata_column <metadata-column>] \
[--type_mapping <option1,option2...>] \
+ [--partition_keys <partition_keys>] \
+ [--primary_keys <primary-keys>] \
[--mysql_conf <mysql-cdc-source-conf> [--mysql_conf
<mysql-cdc-source-conf> ...]] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf
<paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-sink-conf> [--table_conf
<paimon-table-sink-conf> ...]]
diff --git a/docs/content/flink/cdc-ingestion/pulsar-cdc.md
b/docs/content/flink/cdc-ingestion/pulsar-cdc.md
index 76573c229..ca2ccaefb 100644
--- a/docs/content/flink/cdc-ingestion/pulsar-cdc.md
+++ b/docs/content/flink/cdc-ingestion/pulsar-cdc.md
@@ -198,6 +198,8 @@ To use this feature through `flink run`, run the following
shell command.
[--including_tables <table-name|name-regular-expr>] \
[--excluding_tables <table-name|name-regular-expr>] \
[--type_mapping to-string] \
+ [--partition_keys <partition_keys>] \
+ [--primary_keys <primary-keys>] \
[--pulsar_conf <pulsar-source-conf> [--pulsar_conf <pulsar-source-conf>
...]] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf
<paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-sink-conf> [--table_conf
<paimon-table-sink-conf> ...]]
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html
b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index 2b2533b93..c598e950b 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -69,6 +69,18 @@ under the License.
</ul>
</td>
</tr>
+ <tr>
+ <td><h5>--partition_keys</h5></td>
+ <td>The partition keys for Paimon table. If there are multiple
partition keys, connect them with comma, for example "dt,hh,mm".
+ If the keys are not in source table, the sink table won't set
partition keys.</td>
+ </tr>
+ <tr>
+ <td><h5>--primary_keys</h5></td>
+ <td>The primary keys for Paimon table. If there are multiple primary
keys, connect them with comma, for example "buyer_id,seller_id".
+ If the keys are not in source table, but the source table has
primary keys, the sink table will use source table's primary keys.
+ Otherwise, the sink table won't set primary keys.</td>
+ </tr>
+ <tr>
<tr>
<td><h5>--kafka_conf</h5></td>
<td>The configuration for Flink Kafka sources. Each configuration
should be specified in the format `key=value`. `properties.bootstrap.servers`,
`topic/topic-pattern`, `properties.group.id`, and `value.format` are required
configurations, others are optional.See its <a
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options">document</a>
for a complete list of configurations.</td>
diff --git a/docs/layouts/shortcodes/generated/mongodb_sync_database.html
b/docs/layouts/shortcodes/generated/mongodb_sync_database.html
index 9e27ec9ef..fa450ca12 100644
--- a/docs/layouts/shortcodes/generated/mongodb_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mongodb_sync_database.html
@@ -49,6 +49,17 @@ under the License.
<td><h5>--excluding_tables</h5></td>
<td>It is used to specify which source tables are not to be
synchronized. The usage is same as "--including_tables". "--excluding_tables"
has higher priority than "--including_tables" if you specified both.</td>
</tr>
+ <tr>
+ <td><h5>--partition_keys</h5></td>
+ <td>The partition keys for Paimon table. If there are multiple
partition keys, connect them with comma, for example "dt,hh,mm".
+ If the keys are not in source table, the sink table won't set
partition keys.</td>
+ </tr>
+ <tr>
+ <td><h5>--primary_keys</h5></td>
+ <td>The primary keys for Paimon table. If there are multiple primary
keys, connect them with comma, for example "buyer_id,seller_id".
+ If the keys are not in source table, but the source table has
primary keys, the sink table will use source table's primary keys.
+ Otherwise, the sink table won't set primary keys.</td>
+ </tr>
<tr>
<td><h5>--mongodb_conf</h5></td>
<td>The configuration for Flink CDC MongoDB sources. Each
configuration should be specified in the format "key=value". hosts, username,
password, database are required configurations, others are optional. See its <a
href="https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/mongodb-cdc/#connector-options">document</a>
for a complete list of configurations.</td>
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html
b/docs/layouts/shortcodes/generated/mysql_sync_database.html
index c7793cc2e..0cf7360d8 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html
@@ -81,6 +81,17 @@ under the License.
</ul>
</td>
</tr>
+ <tr>
+ <td><h5>--partition_keys</h5></td>
+ <td>The partition keys for Paimon table. If there are multiple
partition keys, connect them with comma, for example "dt,hh,mm".
+ If the keys are not in source table, the sink table won't set
partition keys.</td>
+ </tr>
+ <tr>
+ <td><h5>--primary_keys</h5></td>
+ <td>The primary keys for Paimon table. If there are multiple primary
keys, connect them with comma, for example "buyer_id,seller_id".
+ If the keys are not in source table, but the source table has
primary keys, the sink table will use source table's primary keys.
+ Otherwise, the sink table won't set primary keys.</td>
+ </tr>
<tr>
<td><h5>--mysql_conf</h5></td>
<td>The configuration for Flink CDC MySQL sources. Each configuration
should be specified in the format "key=value". hostname, username, password,
database-name and table-name are required configurations, others are optional.
See its <a
href="https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/docs/connectors/legacy-flink-cdc-sources/mysql-cdc/#connector-options">document</a>
for a complete list of configurations.</td>
diff --git a/docs/layouts/shortcodes/generated/pulsar_sync_database.html
b/docs/layouts/shortcodes/generated/pulsar_sync_database.html
index 307e1b41c..23125b2f8 100644
--- a/docs/layouts/shortcodes/generated/pulsar_sync_database.html
+++ b/docs/layouts/shortcodes/generated/pulsar_sync_database.html
@@ -69,6 +69,17 @@ under the License.
</ul>
</td>
</tr>
+ <tr>
+ <td><h5>--partition_keys</h5></td>
+ <td>The partition keys for Paimon table. If there are multiple
partition keys, connect them with comma, for example "dt,hh,mm".
+ If the keys are not in source table, the sink table won't set
partition keys.</td>
+ </tr>
+ <tr>
+ <td><h5>--primary_keys</h5></td>
+ <td>The primary keys for Paimon table. If there are multiple primary
keys, connect them with comma, for example "buyer_id,seller_id".
+ If the keys are not in source table, but the source table has
primary keys, the sink table will use source table's primary keys.
+ Otherwise, the sink table won't set primary keys.</td>
+ </tr>
<tr>
<td><h5>--pulsar_conf</h5></td>
<td>The configuration for Flink Pulsar sources. Each configuration
should be specified in the format `key=value`. `topic/topic-pattern`,
`value.format`, `pulsar.client.serviceUrl`, `pulsar.admin.adminUrl`, and
`pulsar.consumer.subscriptionName` are required configurations, others are
optional.See its <a
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pulsar/#source-configurable-options">document</a>
for a complete list of configurations.</td>
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index bc307cca4..3e85c7c88 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -36,7 +36,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
@@ -114,6 +113,7 @@ public class CdcActionCommonUtils {
Schema sourceSchema,
CdcMetadataConverter[] metadataConverters,
boolean caseSensitive,
+ boolean strictlyCheckSpecified,
boolean requirePrimaryKeys) {
Schema.Builder builder = Schema.newBuilder();
@@ -147,39 +147,92 @@ public class CdcActionCommonUtils {
checkDuplicateFields(tableName, allFieldNames);
// primary keys
+ specifiedPrimaryKeys = listCaseConvert(specifiedPrimaryKeys,
caseSensitive);
+ List<String> sourceSchemaPrimaryKeys =
+ listCaseConvert(sourceSchema.primaryKeys(), caseSensitive);
+ setPrimaryKeys(
+ tableName,
+ builder,
+ specifiedPrimaryKeys,
+ sourceSchemaPrimaryKeys,
+ allFieldNames,
+ strictlyCheckSpecified,
+ requirePrimaryKeys);
+
+ // partition keys
+ specifiedPartitionKeys = listCaseConvert(specifiedPartitionKeys,
caseSensitive);
+ setPartitionKeys(
+ tableName, builder, specifiedPartitionKeys, allFieldNames,
strictlyCheckSpecified);
+
+ // comment
+ builder.comment(sourceSchema.comment());
+
+ return builder.build();
+ }
+
+ private static void setPrimaryKeys(
+ String tableName,
+ Schema.Builder builder,
+ List<String> specifiedPrimaryKeys,
+ List<String> sourceSchemaPrimaryKeys,
+ List<String> allFieldNames,
+ boolean strictlyCheckSpecified,
+ boolean requirePrimaryKeys) {
if (!specifiedPrimaryKeys.isEmpty()) {
- Set<String> sourceColumns =
-
sourceSchema.fields().stream().map(DataField::name).collect(Collectors.toSet());
- sourceColumns.addAll(
- computedColumns.stream()
- .map(ComputedColumn::columnName)
- .collect(Collectors.toSet()));
- for (String key : specifiedPrimaryKeys) {
- checkArgument(
- sourceColumns.contains(key),
- "Specified primary key '%s' does not exist in source
tables or computed columns %s.",
- key,
- sourceColumns);
+ if (allFieldNames.containsAll(specifiedPrimaryKeys)) {
+ builder.primaryKey(specifiedPrimaryKeys);
+ return;
+ }
+
+ String message =
+ String.format(
+ "For sink table %s, not all specified primary keys
'%s' exist in source tables or computed columns '%s'.",
+ tableName, specifiedPrimaryKeys, allFieldNames);
+ if (strictlyCheckSpecified) {
+ throw new IllegalArgumentException(message);
+ } else {
+ LOG.info(
+ "{} In this case at database-sync, we will set primary
keys from source tables if exist, otherwise, primary keys are not set.",
+ message);
}
- builder.primaryKey(listCaseConvert(specifiedPrimaryKeys,
caseSensitive));
- } else if (!sourceSchema.primaryKeys().isEmpty()) {
- builder.primaryKey(listCaseConvert(sourceSchema.primaryKeys(),
caseSensitive));
- } else if (requirePrimaryKeys) {
+ }
+
+ if (!sourceSchemaPrimaryKeys.isEmpty()) {
+ builder.primaryKey(sourceSchemaPrimaryKeys);
+ return;
+ }
+
+ if (requirePrimaryKeys) {
throw new IllegalArgumentException(
- "Primary keys are not specified. "
- + "Also, can't infer primary keys from source
table schemas because "
+ "Failed to set specified primary keys for sink table "
+ + tableName
+ + ". Also, can't infer primary keys from source
table schemas because "
+ "source tables have no primary keys or have
different primary keys.");
}
+ }
- // partition keys
+ private static void setPartitionKeys(
+ String tableName,
+ Schema.Builder builder,
+ List<String> specifiedPartitionKeys,
+ List<String> allFieldNames,
+ boolean strictlyCheckSpecified) {
if (!specifiedPartitionKeys.isEmpty()) {
- builder.partitionKeys(listCaseConvert(specifiedPartitionKeys,
caseSensitive));
- }
-
- // comment
- builder.comment(sourceSchema.comment());
+ if (allFieldNames.containsAll(specifiedPartitionKeys)) {
+ builder.partitionKeys(specifiedPartitionKeys);
+ return;
+ }
- return builder.build();
+ String message =
+ String.format(
+ "For sink table %s, not all specified partition
keys '%s' exist in source tables or computed columns '%s'.",
+ tableName, specifiedPartitionKeys, allFieldNames);
+ if (strictlyCheckSpecified) {
+ throw new IllegalArgumentException(message);
+ } else {
+ LOG.info("{} In this case at database-sync, partition keys are
not set.", message);
+ }
+ }
}
public static void checkDuplicateFields(String tableName, List<String>
fieldNames) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
index fb505bb8b..ffc05aec0 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
@@ -80,6 +80,7 @@ public abstract class MessageQueueSyncTableActionBase extends
SyncTableActionBas
retrievedSchema,
metadataConverters,
caseSensitive,
+ true,
false);
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 4c7f8ea27..f77bec044 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -49,6 +50,8 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
protected String tablePrefix = "";
protected String tableSuffix = "";
protected String includingTables = ".*";
+ protected List<String> partitionKeys = new ArrayList<>();
+ protected List<String> primaryKeys = new ArrayList<>();
@Nullable protected String excludingTables;
protected List<FileStoreTable> tables = new ArrayList<>();
@@ -102,6 +105,16 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
return this;
}
+ public SyncDatabaseActionBase withPartitionKeys(String... partitionKeys) {
+ this.partitionKeys.addAll(Arrays.asList(partitionKeys));
+ return this;
+ }
+
+ public SyncDatabaseActionBase withPrimaryKeys(String... primaryKeys) {
+ this.primaryKeys.addAll(Arrays.asList(primaryKeys));
+ return this;
+ }
+
@Override
protected void validateCaseSensitivity() {
AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database",
database);
@@ -118,7 +131,8 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
@Override
protected EventParser.Factory<RichCdcMultiplexRecord>
buildEventParserFactory() {
NewTableSchemaBuilder schemaBuilder =
- new NewTableSchemaBuilder(tableConfig, caseSensitive,
metadataConverters);
+ new NewTableSchemaBuilder(
+ tableConfig, caseSensitive, partitionKeys,
primaryKeys, metadataConverters);
Pattern includingPattern = Pattern.compile(includingTables);
Pattern excludingPattern =
excludingTables == null ? null :
Pattern.compile(excludingTables);
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
index 3d8797057..996ecf4c7 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
@@ -26,6 +26,8 @@ import java.util.Optional;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
@@ -49,7 +51,16 @@ public abstract class SyncDatabaseActionFactoryBase<T
extends SyncDatabaseAction
action.withTablePrefix(params.get(TABLE_PREFIX))
.withTableSuffix(params.get(TABLE_SUFFIX))
.includingTables(params.get(INCLUDING_TABLES))
- .excludingTables(params.get(EXCLUDING_TABLES));
+ .excludingTables(params.get(EXCLUDING_TABLES))
+ .withPartitionKeys();
+
+ if (params.has(PARTITION_KEYS)) {
+ action.withPartitionKeys(params.get(PARTITION_KEYS).split(","));
+ }
+
+ if (params.has(PRIMARY_KEYS)) {
+ action.withPrimaryKeys(params.get(PRIMARY_KEYS).split(","));
+ }
if (params.has(TYPE_MAPPING)) {
String[] options = params.get(TYPE_MAPPING).split(",");
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index 1a847d3a0..e335fc2be 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -108,6 +108,7 @@ public abstract class SyncTableActionBase extends
SynchronizationActionBase {
retrievedSchema,
metadataConverters,
caseSensitive,
+ true,
true);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 80a7c8b84..a33f2c978 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -146,13 +146,14 @@ public class MySqlSyncDatabaseAction extends
SyncDatabaseActionBase {
Schema fromMySql =
CdcActionCommonUtils.buildPaimonSchema(
identifier.getFullName(),
- Collections.emptyList(),
- Collections.emptyList(),
+ partitionKeys,
+ primaryKeys,
Collections.emptyList(),
tableConfig,
tableInfo.schema(),
metadataConverters,
caseSensitive,
+ false,
true);
try {
table = (FileStoreTable) catalog.getTable(identifier);
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
index cef7f011a..33dfd576f 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
@@ -20,58 +20,56 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataField;
import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.checkDuplicateFields;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
-import static org.apache.paimon.utils.StringUtils.caseSensitiveConversion;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.buildPaimonSchema;
/** Build schema for new table found in database synchronization. */
public class NewTableSchemaBuilder implements Serializable {
private final Map<String, String> tableConfig;
private final boolean caseSensitive;
+ private final List<String> partitionKeys;
+ private final List<String> primaryKeys;
private final CdcMetadataConverter[] metadataConverters;
public NewTableSchemaBuilder(
Map<String, String> tableConfig,
boolean caseSensitive,
+ List<String> partitionKeys,
+ List<String> primaryKeys,
CdcMetadataConverter[] metadataConverters) {
this.tableConfig = tableConfig;
this.caseSensitive = caseSensitive;
this.metadataConverters = metadataConverters;
+ this.partitionKeys = partitionKeys;
+ this.primaryKeys = primaryKeys;
}
public Optional<Schema> build(RichCdcMultiplexRecord record) {
- Schema.Builder builder = Schema.newBuilder();
- builder.options(tableConfig);
-
- // fields
- List<String> allFieldNames = new ArrayList<>();
-
- for (DataField dataField : record.fields()) {
- String fieldName = caseSensitiveConversion(dataField.name(),
caseSensitive);
- allFieldNames.add(fieldName);
- builder.column(fieldName, dataField.type(),
dataField.description());
- }
-
- for (CdcMetadataConverter metadataConverter : metadataConverters) {
- String metadataColumnName =
- caseSensitiveConversion(metadataConverter.columnName(),
caseSensitive);
- allFieldNames.add(metadataColumnName);
- builder.column(metadataColumnName, metadataConverter.dataType());
- }
-
- checkDuplicateFields(record.tableName(), allFieldNames);
-
- builder.primaryKey(listCaseConvert(record.primaryKeys(),
caseSensitive));
-
- return Optional.of(builder.build());
+ Schema sourceSchema =
+ new Schema(
+ record.fields(),
+ Collections.emptyList(),
+ record.primaryKeys(),
+ Collections.emptyMap(),
+ null);
+ return Optional.of(
+ buildPaimonSchema(
+ record.tableName(),
+ partitionKeys,
+ primaryKeys,
+ Collections.emptyList(),
+ tableConfig,
+ sourceSchema,
+ metadataConverters,
+ caseSensitive,
+ false,
+ true));
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
index 63b42e627..468a3074d 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
@@ -361,6 +361,8 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
@Nullable private String excludingTables;
@Nullable private String mode;
private final List<String> typeMappingModes = new ArrayList<>();
+ private final List<String> partitionKeys = new ArrayList<>();
+ private final List<String> primaryKeys = new ArrayList<>();
private final List<String> metadataColumn = new ArrayList<>();
public SyncDatabaseActionBuilder(Class<T> clazz, Map<String, String>
sourceConfig) {
@@ -418,6 +420,16 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
return this;
}
+ public SyncDatabaseActionBuilder<T> withPartitionKeys(String...
partitionKeys) {
+ this.partitionKeys.addAll(Arrays.asList(partitionKeys));
+ return this;
+ }
+
+ public SyncDatabaseActionBuilder<T> withPrimaryKeys(String...
primaryKeys) {
+ this.primaryKeys.addAll(Arrays.asList(primaryKeys));
+ return this;
+ }
+
public SyncDatabaseActionBuilder<T> withMetadataColumn(List<String>
metadataColumn) {
this.metadataColumn.addAll(metadataColumn);
return this;
@@ -446,6 +458,8 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
args.addAll(nullableToArgs("--mode", mode));
args.addAll(listToArgs("--type-mapping", typeMappingModes));
+ args.addAll(listToArgs("--partition-keys", partitionKeys));
+ args.addAll(listToArgs("--primary-keys", primaryKeys));
args.addAll(listToArgs("--metadata-column", metadataColumn));
return createAction(clazz, args);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 01cb78dae..025a6cce0 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -584,4 +584,60 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
+ "{databaseName=null, tableName=null,
fields=[`k` STRING, `v0` STRING, `v1` STRING], "
+ "primaryKeys=[], cdcRecord=+I
{v0=five, k=5, v1=50}}"));
}
+
+ @Test
+ @Timeout(60)
+ public void testSpecifyKeys() throws Exception {
+ final String topic = "specify-keys";
+ createTestTopic(topic, 1, 1);
+ writeRecordsToKafka(topic,
"kafka/canal/database/specify-keys/canal-data-1.txt");
+
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+
+ KafkaSyncDatabaseAction action =
+ syncDatabaseActionBuilder(kafkaConfig)
+ .withTableConfig(getBasicTableConfig())
+ .withPartitionKeys("part")
+ .withPrimaryKeys("k", "part")
+ .build();
+ runActionWithDefaultEnv(action);
+
+ waitingTables("t1", "t2");
+
+ FileStoreTable table1 = getFileStoreTable("t1");
+ assertThat(table1.partitionKeys()).containsExactly("part");
+ assertThat(table1.primaryKeys()).containsExactly("k", "part");
+
+ RowType rowType1 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(10),
+ },
+ new String[] {"k", "part", "v1"});
+ waitForResult(
+ Collections.singletonList("+I[1, 1, A]"),
+ table1,
+ rowType1,
+ Arrays.asList("k", "part"));
+
+ FileStoreTable table2 = getFileStoreTable("t2");
+ assertThat(table2.partitionKeys()).isEmpty();
+ assertThat(table2.primaryKeys()).containsExactly("k");
+
+ RowType rowType2 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(), DataTypes.VARCHAR(10),
+ },
+ new String[] {"k", "v1"});
+ waitForResult(
+ Collections.singletonList("+I[1, A]"),
+ table2,
+ rowType2,
+ Collections.singletonList("k"));
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index b1dda4736..b48b898d6 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -1361,6 +1361,79 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
}
}
+ @Test
+ @Timeout(60)
+ public void testSpecifyKeys() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", "test_specify_keys");
+
+ MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ?
DIVIDED : COMBINED;
+ MySqlSyncDatabaseAction action =
+ syncDatabaseActionBuilder(mySqlConfig)
+ .withTableConfig(getBasicTableConfig())
+ .withMode(mode.configString())
+ .withPartitionKeys("part")
+ .withPrimaryKeys("k", "part")
+ .build();
+ runActionWithDefaultEnv(action);
+
+ try (Statement statement = getStatement()) {
+ statement.executeUpdate("USE test_specify_keys");
+ testSpecifyKeysVerify1("t1", statement);
+ testSpecifyKeysVerify2("t2", statement);
+
+ // test newly created table
+ if (mode == COMBINED) {
+ statement.executeUpdate(
+ "CREATE TABLE t3 (k INT, part INT, v1 VARCHAR(10),
PRIMARY KEY (k))");
+ statement.executeUpdate("CREATE TABLE t4 (k INT, v1
VARCHAR(10), PRIMARY KEY (k))");
+ waitingTables("t3", "t4");
+ testSpecifyKeysVerify1("t3", statement);
+ testSpecifyKeysVerify2("t4", statement);
+ }
+ }
+ }
+
+ private void testSpecifyKeysVerify1(String tableName, Statement statement)
throws Exception {
+ FileStoreTable table = getFileStoreTable(tableName);
+ assertThat(table.partitionKeys()).containsExactly("part");
+ assertThat(table.primaryKeys()).containsExactly("k", "part");
+
+ statement.executeUpdate("INSERT INTO " + tableName + " VALUES(1, 1,
'A')");
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(10),
+ },
+ new String[] {"k", "part", "v1"});
+ waitForResult(
+ Collections.singletonList("+I[1, 1, A]"),
+ table,
+ rowType,
+ Arrays.asList("k", "part"));
+ }
+
+ private void testSpecifyKeysVerify2(String tableName, Statement statement)
throws Exception {
+ FileStoreTable table = getFileStoreTable(tableName);
+ assertThat(table.partitionKeys()).isEmpty();
+ assertThat(table.primaryKeys()).containsExactly("k");
+
+ statement.executeUpdate("INSERT INTO " + tableName + " VALUES(1,
'A')");
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(), DataTypes.VARCHAR(10),
+ },
+ new String[] {"k", "v1"});
+ waitForResult(
+ Collections.singletonList("+I[1, A]"),
+ table,
+ rowType,
+ Collections.singletonList("k"));
+ }
+
private class SyncNewTableJob implements Runnable {
private final int ith;
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index db8e1744b..9568b1c3b 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -680,7 +680,9 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
- "Specified primary key 'pk' does not exist in
source tables or computed columns [pt, _id, v1]."));
+ "For sink table "
+ + tableName
+ + ", not all specified primary keys
'[pk]' exist in source tables or computed columns '[pt, _id, v1]'."));
}
@Test
@@ -695,8 +697,9 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
- "Primary keys are not specified. "
- + "Also, can't infer primary keys from
source table schemas because "
+ "Failed to set specified primary keys for sink
table "
+ + tableName
+ + ". Also, can't infer primary keys
from source table schemas because "
+ "source tables have no primary keys
or have different primary keys."));
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
index be67cd20b..1e1524954 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
@@ -509,7 +509,9 @@ public class PostgresSyncTableActionITCase extends
PostgresActionITCaseBase {
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
- "Specified primary key 'pk' does not exist in
source tables or computed columns [pt, _id, v1]."));
+ "For sink table "
+ + tableName
+ + ", not all specified primary keys
'[pk]' exist in source tables or computed columns '[pt, _id, v1]'."));
}
@Test
@@ -525,8 +527,9 @@ public class PostgresSyncTableActionITCase extends
PostgresActionITCaseBase {
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
- "Primary keys are not specified. "
- + "Also, can't infer primary keys from
source table schemas because "
+ "Failed to set specified primary keys for sink
table "
+ + tableName
+ + ". Also, can't infer primary keys
from source table schemas because "
+ "source tables have no primary keys
or have different primary keys."));
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/specify-keys/canal-data-1.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/specify-keys/canal-data-1.txt
new file mode 100644
index 000000000..392da92c6
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/specify-keys/canal-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"k":"1","part":"1","v1":"A"}],"database":"test_specify_keys","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k":"INT","part":"INT","v1":"VARCHAR(10)"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"part":4,"v1":12},"table":"t1","ts":1684770072286,"type":"INSERT"}
+{"data":[{"k":"1","v1":"A"}],"database":"test_specify_keys","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k":"INT","v1":"VARCHAR(10)"},"old":null,"pkNames":["k"],"sql":"","sqlType":{"k":4,"v1":12},"table":"t2","ts":1684770072286,"type":"INSERT"}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql
index b882e6d0a..83ba935f0 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql
@@ -494,3 +494,23 @@ CREATE TABLE t2 (
v1 VARCHAR(10),
PRIMARY KEY (k)
);
+
+--
################################################################################
+-- testSpecifyKeys
+--
################################################################################
+
+CREATE DATABASE test_specify_keys;
+USE test_specify_keys;
+
+CREATE TABLE t1 (
+ k INT,
+ part INT,
+ v1 VARCHAR(10),
+ PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+ k INT,
+ v1 VARCHAR(10),
+ PRIMARY KEY (k)
+);
\ No newline at end of file