This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 79bfb05ccf [flink-cdc] Provide option to disable use of source primary keys if primary keys in action command are not specified for CDC ingestion. (#5793) 79bfb05ccf is described below commit 79bfb05ccfaa5b8aff3ffdeb422740a869c07c30 Author: Ashish Khatkar <akhatka...@gmail.com> AuthorDate: Thu Jul 31 08:16:39 2025 +0100 [flink-cdc] Provide option to disable use of source primary keys if primary keys in action command are not specified for CDC ingestion. (#5793) --- .../shortcodes/generated/kafka_sync_database.html | 10 +++- .../shortcodes/generated/kafka_sync_table.html | 4 ++ .../generated/mongodb_sync_database.html | 10 +++- .../shortcodes/generated/mysql_sync_database.html | 10 +++- .../shortcodes/generated/mysql_sync_table.html | 4 ++ .../shortcodes/generated/postgres_sync_table.html | 4 ++ .../shortcodes/generated/pulsar_sync_database.html | 10 +++- .../shortcodes/generated/pulsar_sync_table.html | 4 ++ .../flink/action/cdc/CdcActionCommonUtils.java | 15 ++++-- .../cdc/MessageQueueSyncTableActionBase.java | 4 +- .../flink/action/cdc/SyncDatabaseActionBase.java | 1 + .../action/cdc/SyncDatabaseActionFactoryBase.java | 6 +++ .../flink/action/cdc/SyncTableActionBase.java | 3 +- .../action/cdc/SyncTableActionFactoryBase.java | 6 +++ .../action/cdc/SynchronizationActionBase.java | 8 ++++ .../action/cdc/mysql/MySqlSyncDatabaseAction.java | 3 +- .../flink/sink/cdc/NewTableSchemaBuilder.java | 6 ++- .../flink/action/cdc/CdcActionITCaseBase.java | 11 ++++- .../cdc/mysql/MySqlSyncTableActionITCase.java | 53 ++++++++++++++++++++++ .../src/test/resources/mysql/sync_table_setup.sql | 13 +++++- 20 files changed, 165 insertions(+), 20 deletions(-) diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html b/docs/layouts/shortcodes/generated/kafka_sync_database.html index 4b6ee3e38d..e8d5898c34 100644 --- a/docs/layouts/shortcodes/generated/kafka_sync_database.html +++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html @@ -115,8 +115,14 @@ under the License. <tr> <td><h5>--primary_keys</h5></td> <td>The primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example "buyer_id,seller_id". - If the keys are not in source table, but the source table has primary keys, the sink table will use source table's primary keys. - Otherwise, the sink table won't set primary keys.</td> + If the keys are not provided, but the source has primary keys, the sink table will use source's primary keys. + Otherwise, the sink table won't set primary keys. + If the keys are not provided, but the source has primary keys, and you don't want to use source's primary keys, + use --sync_primary_keys_from_source_schema.</td> + </tr> + <tr> + <td><h5>--sync_primary_keys_from_source_schema</h5></td> + <td>This is used to specify if primary keys from source should be used in paimon schema if primary keys using --primary_keys are not specified. The default is true.</td> </tr> <tr> <tr> diff --git a/docs/layouts/shortcodes/generated/kafka_sync_table.html b/docs/layouts/shortcodes/generated/kafka_sync_table.html index 2345a4d84a..10669f594f 100644 --- a/docs/layouts/shortcodes/generated/kafka_sync_table.html +++ b/docs/layouts/shortcodes/generated/kafka_sync_table.html @@ -62,6 +62,10 @@ under the License. </ul> </td> </tr> + <tr> + <td><h5>--sync_primary_keys_from_source_schema</h5></td> + <td>This is used to specify if primary keys from source should be used in paimon schema if primary keys using --primary_keys are not specified. The default is true.</td> + </tr> <tr> <td><h5>--computed_column</h5></td> <td>The definitions of computed columns. The argument field is from Kafka topic's table field name. See <a href="../overview/#computed-functions">here</a> for a complete list of configurations. </td> diff --git a/docs/layouts/shortcodes/generated/mongodb_sync_database.html b/docs/layouts/shortcodes/generated/mongodb_sync_database.html index e0a11fb0e8..2a854259e4 100644 --- a/docs/layouts/shortcodes/generated/mongodb_sync_database.html +++ b/docs/layouts/shortcodes/generated/mongodb_sync_database.html @@ -57,8 +57,14 @@ under the License. <tr> <td><h5>--primary_keys</h5></td> <td>The primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example "buyer_id,seller_id". - If the keys are not in source table, but the source table has primary keys, the sink table will use source table's primary keys. - Otherwise, the sink table won't set primary keys.</td> + If the keys are not provided, but the source has primary keys, the sink table will use source's primary keys. + Otherwise, the sink table won't set primary keys. + If the keys are not provided, but the source has primary keys, and you don't want to use source's primary keys, + use --sync_primary_keys_from_source_schema.</td> + </tr> + <tr> + <td><h5>--sync_primary_keys_from_source_schema</h5></td> + <td>This is used to specify if primary keys from source should be used in paimon schema if primary keys using --primary_keys are not specified. The default is true.</td> </tr> <tr> <td><h5>--mongodb_conf</h5></td> diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html b/docs/layouts/shortcodes/generated/mysql_sync_database.html index a32fa920de..9b2fe648d2 100644 --- a/docs/layouts/shortcodes/generated/mysql_sync_database.html +++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html @@ -89,8 +89,14 @@ under the License. <tr> <td><h5>--primary_keys</h5></td> <td>The primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example "buyer_id,seller_id". - If the keys are not in source table, but the source table has primary keys, the sink table will use source table's primary keys. - Otherwise, the sink table won't set primary keys.</td> + If the keys are not provided, but the source has primary keys, the sink table will use source's primary keys. + Otherwise, the sink table won't set primary keys. + If the keys are not provided, but the source has primary keys, and you don't want to use source's primary keys, + use --sync_primary_keys_from_source_schema.</td> + </tr> + <tr> + <td><h5>--sync_primary_keys_from_source_schema</h5></td> + <td>This is used to specify if primary keys from source should be used in paimon schema if primary keys using --primary_keys are not specified. The default is true.</td> </tr> <tr> <td><h5>--mysql_conf</h5></td> diff --git a/docs/layouts/shortcodes/generated/mysql_sync_table.html b/docs/layouts/shortcodes/generated/mysql_sync_table.html index d66541ac64..25901973c9 100644 --- a/docs/layouts/shortcodes/generated/mysql_sync_table.html +++ b/docs/layouts/shortcodes/generated/mysql_sync_table.html @@ -61,6 +61,10 @@ under the License. </ul> </td> </tr> + <tr> + <td><h5>--sync_primary_keys_from_source_schema</h5></td> + <td>This is used to specify if primary keys from source should be used in paimon schema if primary keys using --primary_keys are not specified. The default is true.</td> + </tr> <tr> <td><h5>--computed_column</h5></td> <td>The definitions of computed columns. The argument field is from MySQL table field name. See <a href="../overview/#computed-functions">here</a> for a complete list of configurations. </td> diff --git a/docs/layouts/shortcodes/generated/postgres_sync_table.html b/docs/layouts/shortcodes/generated/postgres_sync_table.html index 6bc2cb7b2c..b660d20b15 100644 --- a/docs/layouts/shortcodes/generated/postgres_sync_table.html +++ b/docs/layouts/shortcodes/generated/postgres_sync_table.html @@ -54,6 +54,10 @@ under the License. </ul> </td> </tr> + <tr> + <td><h5>--sync_primary_keys_from_source_schema</h5></td> + <td>This is used to specify if primary keys from source should be used in paimon schema if primary keys using --primary_keys are not specified. The default is true.</td> + </tr> <tr> <td><h5>--computed_column</h5></td> <td>The definitions of computed columns. The argument field is from PostgreSQL table field name. See <a href="../overview/#computed-functions">here</a> for a complete list of configurations. </td> diff --git a/docs/layouts/shortcodes/generated/pulsar_sync_database.html b/docs/layouts/shortcodes/generated/pulsar_sync_database.html index 58d95e88ed..9e87348362 100644 --- a/docs/layouts/shortcodes/generated/pulsar_sync_database.html +++ b/docs/layouts/shortcodes/generated/pulsar_sync_database.html @@ -77,8 +77,14 @@ under the License. <tr> <td><h5>--primary_keys</h5></td> <td>The primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example "buyer_id,seller_id". - If the keys are not in source table, but the source table has primary keys, the sink table will use source table's primary keys. - Otherwise, the sink table won't set primary keys.</td> + If the keys are not provided, but the source has primary keys, the sink table will use source's primary keys. + Otherwise, the sink table won't set primary keys. + If the keys are not provided, but the source has primary keys, and you don't want to use source's primary keys, + use --sync_primary_keys_from_source_schema.</td> + </tr> + <tr> + <td><h5>--sync_primary_keys_from_source_schema</h5></td> + <td>This is used to specify if primary keys from source should be used in paimon schema if primary keys using --primary_keys are not specified. The default is true.</td> </tr> <tr> <td><h5>--pulsar_conf</h5></td> diff --git a/docs/layouts/shortcodes/generated/pulsar_sync_table.html b/docs/layouts/shortcodes/generated/pulsar_sync_table.html index 4fc16910e0..30c0b4ae3c 100644 --- a/docs/layouts/shortcodes/generated/pulsar_sync_table.html +++ b/docs/layouts/shortcodes/generated/pulsar_sync_table.html @@ -61,6 +61,10 @@ under the License. </ul> </td> </tr> + <tr> + <td><h5>--sync_primary_keys_from_source_schema</h5></td> + <td>This is used to specify if primary keys from source should be used in paimon schema if primary keys using --primary_keys are not specified. The default is true.</td> + </tr> <tr> <td><h5>--computed_column</h5></td> <td>The definitions of computed columns. The argument field is from Pulsar topic's table field name. See <a href="../overview/#computed-functions">here</a> for a complete list of configurations. </td> diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index 46127c5f78..6a3001319c 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -70,6 +70,8 @@ public class CdcActionCommonUtils { public static final String METADATA_COLUMN = "metadata_column"; public static final String MULTIPLE_TABLE_PARTITION_KEYS = "multiple_table_partition_keys"; public static final String EAGER_INIT = "eager_init"; + public static final String SYNC_PKEYS_FROM_SOURCE_SCHEMA = + "sync_primary_keys_from_source_schema"; public static void assertSchemaCompatible( TableSchema paimonSchema, List<DataField> sourceTableFields) { @@ -122,7 +124,8 @@ public class CdcActionCommonUtils { CdcMetadataConverter[] metadataConverters, boolean caseSensitive, boolean strictlyCheckSpecified, - boolean requirePrimaryKeys) { + boolean requirePrimaryKeys, + boolean syncPKeysFromSourceSchema) { Schema.Builder builder = Schema.newBuilder(); // options @@ -165,7 +168,8 @@ public class CdcActionCommonUtils { sourceSchemaPrimaryKeys, allFieldNames, strictlyCheckSpecified, - requirePrimaryKeys); + requirePrimaryKeys, + syncPKeysFromSourceSchema); // partition keys specifiedPartitionKeys = listCaseConvert(specifiedPartitionKeys, caseSensitive); @@ -185,7 +189,8 @@ public class CdcActionCommonUtils { List<String> sourceSchemaPrimaryKeys, List<String> allFieldNames, boolean strictlyCheckSpecified, - boolean requirePrimaryKeys) { + boolean requirePrimaryKeys, + boolean syncPKeysFromSourceSchema) { if (!specifiedPrimaryKeys.isEmpty()) { if (allFieldNames.containsAll(specifiedPrimaryKeys)) { builder.primaryKey(specifiedPrimaryKeys); @@ -205,12 +210,12 @@ public class CdcActionCommonUtils { } } - if (!sourceSchemaPrimaryKeys.isEmpty()) { + if (syncPKeysFromSourceSchema && !sourceSchemaPrimaryKeys.isEmpty()) { builder.primaryKey(sourceSchemaPrimaryKeys); return; } - if (requirePrimaryKeys) { + if (requirePrimaryKeys && syncPKeysFromSourceSchema) { throw new IllegalArgumentException( "Failed to set specified primary keys for sink table " + tableName diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java index 1e1671b4a3..6dbb2b489d 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java @@ -52,7 +52,6 @@ import java.util.Map; * </ul> */ public abstract class MessageQueueSyncTableActionBase extends SyncTableActionBase { - public MessageQueueSyncTableActionBase( String database, String table, @@ -87,6 +86,7 @@ public abstract class MessageQueueSyncTableActionBase extends SyncTableActionBas metadataConverters, caseSensitive, true, - false); + false, + this.syncPKeysFromSourceSchema); } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java index 4ce4e7c250..36a9be033e 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java @@ -202,6 +202,7 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase { partitionKeys, primaryKeys, requirePrimaryKeys(), + syncPKeysFromSourceSchema, partitionKeyMultiple, metadataConverters); Pattern tblIncludingPattern = Pattern.compile(includingTables); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java index 8d0b8b9cef..28134375f6 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java @@ -34,6 +34,7 @@ import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_ import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.SYNC_PKEYS_FROM_SOURCE_SCHEMA; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_MAPPING; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX_DB; @@ -86,5 +87,10 @@ public abstract class SyncDatabaseActionFactoryBase<T extends SyncDatabaseAction action.withComputedColumnArgs( new ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN))); } + + if (params.has(SYNC_PKEYS_FROM_SOURCE_SCHEMA)) { + action.syncPKeysFromSourceSchema( + Boolean.parseBoolean(params.get(SYNC_PKEYS_FROM_SOURCE_SCHEMA))); + } } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java index e0de071d29..0f984d3654 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java @@ -107,7 +107,8 @@ public abstract class SyncTableActionBase extends SynchronizationActionBase { metadataConverters, caseSensitive, true, - true); + true, + this.syncPKeysFromSourceSchema); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java index 08b9131f71..cb9b678d19 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java @@ -31,6 +31,7 @@ import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_C import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.SYNC_PKEYS_FROM_SOURCE_SCHEMA; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING; /** Base {@link ActionFactory} for synchronizing into one Paimon table. */ @@ -76,5 +77,10 @@ public abstract class SyncTableActionFactoryBase String[] options = params.get(TYPE_MAPPING).split(","); action.withTypeMapping(TypeMapping.parse(options)); } + + if (params.has(SYNC_PKEYS_FROM_SOURCE_SCHEMA)) { + boolean flag = Boolean.parseBoolean(params.get(SYNC_PKEYS_FROM_SOURCE_SCHEMA)); + action.syncPKeysFromSourceSchema(flag); + } } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 446ce26cb3..d4b2b15394 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -68,6 +68,9 @@ public abstract class SynchronizationActionBase extends ActionBase { protected Map<String, String> tableConfig = new HashMap<>(); protected TypeMapping typeMapping = TypeMapping.defaultMapping(); + // this is to specify if we should use primary keys from source + // in paimon schema if pkeys are not specified in action command + protected boolean syncPKeysFromSourceSchema = true; protected CdcMetadataConverter[] metadataConverters = new CdcMetadataConverter[] {}; public SynchronizationActionBase( @@ -102,6 +105,11 @@ public abstract class SynchronizationActionBase extends ActionBase { return this; } + public SynchronizationActionBase syncPKeysFromSourceSchema(boolean flag) { + this.syncPKeysFromSourceSchema = flag; + return this; + } + @VisibleForTesting public Map<String, String> tableConfig() { return tableConfig; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index 790467ba0d..14238fd818 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -154,7 +154,8 @@ public class MySqlSyncDatabaseAction extends SyncDatabaseActionBase { metadataConverters, caseSensitive, false, - true); + true, + this.syncPKeysFromSourceSchema); try { table = (FileStoreTable) catalog.getTable(identifier); Supplier<String> errMsg = diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java index 0d4a74ac2f..1435cfd5da 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java @@ -38,6 +38,7 @@ public class NewTableSchemaBuilder implements Serializable { private final List<String> partitionKeys; private final List<String> primaryKeys; private final boolean requirePrimaryKeys; + private final boolean syncPKeysFromSourceSchema; private final CdcMetadataConverter[] metadataConverters; private final Map<String, List<String>> partitionKeyMultiple; @@ -47,6 +48,7 @@ public class NewTableSchemaBuilder implements Serializable { List<String> partitionKeys, List<String> primaryKeys, boolean requirePrimaryKeys, + boolean syncPKeysFromSourceSchema, Map<String, List<String>> partitionKeyMultiple, CdcMetadataConverter[] metadataConverters) { this.tableConfig = tableConfig; @@ -55,6 +57,7 @@ public class NewTableSchemaBuilder implements Serializable { this.partitionKeys = partitionKeys; this.primaryKeys = primaryKeys; this.requirePrimaryKeys = requirePrimaryKeys; + this.syncPKeysFromSourceSchema = syncPKeysFromSourceSchema; this.partitionKeyMultiple = partitionKeyMultiple; } @@ -80,6 +83,7 @@ public class NewTableSchemaBuilder implements Serializable { metadataConverters, caseSensitive, false, - requirePrimaryKeys)); + requirePrimaryKeys, + syncPKeysFromSourceSchema)); } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java index 855623b1af..312c323e6a 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java @@ -326,6 +326,7 @@ public class CdcActionITCaseBase extends ActionITCaseBase { private final List<String> computedColumnArgs = new ArrayList<>(); private final List<String> typeMappingModes = new ArrayList<>(); private final List<String> metadataColumns = new ArrayList<>(); + private boolean syncPKeysFromSourceSchema = true; public SyncTableActionBuilder(Class<T> clazz, Map<String, String> sourceConfig) { this.clazz = clazz; @@ -371,6 +372,11 @@ public class CdcActionITCaseBase extends ActionITCaseBase { return this; } + public SyncTableActionBuilder<T> syncPKeysFromSourceSchema(boolean flag) { + this.syncPKeysFromSourceSchema = flag; + return this; + } + public T build() { List<String> args = new ArrayList<>( @@ -381,7 +387,9 @@ public class CdcActionITCaseBase extends ActionITCaseBase { "--database", database, "--table", - tableName)); + tableName, + "--sync_primary_keys_from_source_schema", + String.valueOf(syncPKeysFromSourceSchema))); args.addAll(mapToArgs(getConfKey(clazz), sourceConfig)); args.addAll(mapToArgs("--catalog-conf", catalogConfig)); @@ -393,6 +401,7 @@ public class CdcActionITCaseBase extends ActionITCaseBase { args.addAll(listToMultiArgs("--computed-column", computedColumnArgs)); args.addAll(listToMultiArgs("--metadata-column", metadataColumns)); + args.add("--use_pkeys_from_source_for_paimon_schema"); return createAction(clazz, args); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 171b489d42..b48a1c79cf 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -1599,4 +1599,57 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase { waitForResult(expected, table, rowType, primaryKeys); } } + + @Test + @Timeout(60) + public void testSyncPrimaryKeysFromSourceSchemaTrue() throws Exception { + Map<String, String> mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", "check_sync_primary_keys_from_source_schema"); + mySqlConfig.put("table-name", "t"); + + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig).withTableConfig(getBasicTableConfig()).build(); + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(); + TableSchema schema = table.schema(); + assertThat(schema.primaryKeys().isEmpty()).isEqualTo(false); + assertThat(schema.primaryKeys()).isEqualTo(Collections.singletonList("k")); + + List<String> expectedInsert = Arrays.asList("+I[1, Apache]", "+I[2, Paimon]"); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k", "v1"}); + waitForResult(expectedInsert, table, rowType, Collections.singletonList("k")); + } + + @Test + @Timeout(60) + public void testSyncPrimaryKeysFromSourceSchemaFalse() throws Exception { + Map<String, String> mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", "check_sync_primary_keys_from_source_schema"); + mySqlConfig.put("table-name", "t"); + + Map<String, String> tableConfig = getBasicTableConfig(); + tableConfig.put("bucket-key", "v1"); + + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig) + .withTableConfig(tableConfig) + .syncPKeysFromSourceSchema(false) + .build(); + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(); + TableSchema schema = table.schema(); + assertThat(schema.primaryKeys().isEmpty()).isEqualTo(true); + + List<String> expectedInsert = Arrays.asList("+I[1, Apache]", "+I[2, Paimon]"); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k", "v1"}); + waitForResult(expectedInsert, table, rowType, Collections.emptyList()); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql index 66e0b776d0..ae0186cf70 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql @@ -456,4 +456,15 @@ USE check_cdc_sync_runtime_execution_mode; CREATE TABLE t ( k INT PRIMARY KEY, v1 VARCHAR(10) -); \ No newline at end of file +); + +-- ################################################################################ +-- testSyncPrimaryKeysFromSourceSchema{True/False} +-- ################################################################################ +CREATE DATABASE check_sync_primary_keys_from_source_schema; +USE check_sync_primary_keys_from_source_schema; +CREATE TABLE t ( + k INT PRIMARY KEY, + v1 VARCHAR(10) +); +INSERT INTO t VALUES (1, 'Apache'), (2, 'Paimon');