This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 79bfb05ccf [flink-cdc] Provide option to disable use of source primary 
keys if primary keys in action command are not specified for CDC ingestion. 
(#5793)
79bfb05ccf is described below

commit 79bfb05ccfaa5b8aff3ffdeb422740a869c07c30
Author: Ashish Khatkar <akhatka...@gmail.com>
AuthorDate: Thu Jul 31 08:16:39 2025 +0100

    [flink-cdc] Provide option to disable use of source primary keys if primary 
keys in action command are not specified for CDC ingestion. (#5793)
---
 .../shortcodes/generated/kafka_sync_database.html  | 10 +++-
 .../shortcodes/generated/kafka_sync_table.html     |  4 ++
 .../generated/mongodb_sync_database.html           | 10 +++-
 .../shortcodes/generated/mysql_sync_database.html  | 10 +++-
 .../shortcodes/generated/mysql_sync_table.html     |  4 ++
 .../shortcodes/generated/postgres_sync_table.html  |  4 ++
 .../shortcodes/generated/pulsar_sync_database.html | 10 +++-
 .../shortcodes/generated/pulsar_sync_table.html    |  4 ++
 .../flink/action/cdc/CdcActionCommonUtils.java     | 15 ++++--
 .../cdc/MessageQueueSyncTableActionBase.java       |  4 +-
 .../flink/action/cdc/SyncDatabaseActionBase.java   |  1 +
 .../action/cdc/SyncDatabaseActionFactoryBase.java  |  6 +++
 .../flink/action/cdc/SyncTableActionBase.java      |  3 +-
 .../action/cdc/SyncTableActionFactoryBase.java     |  6 +++
 .../action/cdc/SynchronizationActionBase.java      |  8 ++++
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |  3 +-
 .../flink/sink/cdc/NewTableSchemaBuilder.java      |  6 ++-
 .../flink/action/cdc/CdcActionITCaseBase.java      | 11 ++++-
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 53 ++++++++++++++++++++++
 .../src/test/resources/mysql/sync_table_setup.sql  | 13 +++++-
 20 files changed, 165 insertions(+), 20 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html 
b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index 4b6ee3e38d..e8d5898c34 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -115,8 +115,14 @@ under the License.
     <tr>
         <td><h5>--primary_keys</h5></td>
         <td>The primary keys for Paimon table. If there are multiple primary 
keys, connect them with comma, for example "buyer_id,seller_id".
-            If the keys are not in source table, but the source table has 
primary keys, the sink table will use source table's primary keys.
-            Otherwise, the sink table won't set primary keys.</td>
+            If the keys are not provided, but the source has primary keys, the 
sink table will use source's primary keys.
+            Otherwise, the sink table won't set primary keys.
+            If the keys are not provided, but the source has primary keys, and 
you don't want to use source's primary keys,
+            use --sync_primary_keys_from_source_schema.</td>
+    </tr>
+    <tr>
+        <td><h5>--sync_primary_keys_from_source_schema</h5></td>
+        <td>This is used to specify if primary keys from source should be used 
in paimon schema if primary keys using --primary_keys are not specified. The 
default is true.</td>
     </tr>
     <tr>
     <tr>
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_table.html 
b/docs/layouts/shortcodes/generated/kafka_sync_table.html
index 2345a4d84a..10669f594f 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_table.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_table.html
@@ -62,6 +62,10 @@ under the License.
             </ul>
         </td>
     </tr>
+    <tr>
+        <td><h5>--sync_primary_keys_from_source_schema</h5></td>
+        <td>This is used to specify if primary keys from source should be used 
in paimon schema if primary keys using --primary_keys are not specified. The 
default is true.</td>
+    </tr>
     <tr>
         <td><h5>--computed_column</h5></td>
         <td>The definitions of computed columns. The argument field is from 
Kafka topic's table field name. See <a 
href="../overview/#computed-functions">here</a> for a complete list of 
configurations. </td>
diff --git a/docs/layouts/shortcodes/generated/mongodb_sync_database.html 
b/docs/layouts/shortcodes/generated/mongodb_sync_database.html
index e0a11fb0e8..2a854259e4 100644
--- a/docs/layouts/shortcodes/generated/mongodb_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mongodb_sync_database.html
@@ -57,8 +57,14 @@ under the License.
     <tr>
         <td><h5>--primary_keys</h5></td>
         <td>The primary keys for Paimon table. If there are multiple primary 
keys, connect them with comma, for example "buyer_id,seller_id".
-            If the keys are not in source table, but the source table has 
primary keys, the sink table will use source table's primary keys.
-            Otherwise, the sink table won't set primary keys.</td>
+            If the keys are not provided, but the source has primary keys, the 
sink table will use source's primary keys.
+            Otherwise, the sink table won't set primary keys.
+            If the keys are not provided, but the source has primary keys, and 
you don't want to use source's primary keys,
+            use --sync_primary_keys_from_source_schema.</td>
+    </tr>
+    <tr>
+        <td><h5>--sync_primary_keys_from_source_schema</h5></td>
+        <td>This is used to specify if primary keys from source should be used 
in paimon schema if primary keys using --primary_keys are not specified. The 
default is true.</td>
     </tr>
     <tr>
         <td><h5>--mongodb_conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html 
b/docs/layouts/shortcodes/generated/mysql_sync_database.html
index a32fa920de..9b2fe648d2 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html
@@ -89,8 +89,14 @@ under the License.
     <tr>
         <td><h5>--primary_keys</h5></td>
         <td>The primary keys for Paimon table. If there are multiple primary 
keys, connect them with comma, for example "buyer_id,seller_id".
-            If the keys are not in source table, but the source table has 
primary keys, the sink table will use source table's primary keys.
-            Otherwise, the sink table won't set primary keys.</td>
+            If the keys are not provided, but the source has primary keys, the 
sink table will use source's primary keys.
+            Otherwise, the sink table won't set primary keys.
+            If the keys are not provided, but the source has primary keys, and 
you don't want to use source's primary keys,
+            use --sync_primary_keys_from_source_schema.</td>
+    </tr>
+    <tr>
+        <td><h5>--sync_primary_keys_from_source_schema</h5></td>
+        <td>This is used to specify if primary keys from source should be used 
in paimon schema if primary keys using --primary_keys are not specified. The 
default is true.</td>
     </tr>
     <tr>
         <td><h5>--mysql_conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_table.html 
b/docs/layouts/shortcodes/generated/mysql_sync_table.html
index d66541ac64..25901973c9 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_table.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_table.html
@@ -61,6 +61,10 @@ under the License.
             </ul>
         </td>
     </tr>
+    <tr>
+        <td><h5>--sync_primary_keys_from_source_schema</h5></td>
+        <td>This is used to specify if primary keys from source should be used 
in paimon schema if primary keys using --primary_keys are not specified. The 
default is true.</td>
+    </tr>
     <tr>
         <td><h5>--computed_column</h5></td>
         <td>The definitions of computed columns. The argument field is from 
MySQL table field name. See <a href="../overview/#computed-functions">here</a> 
for a complete list of configurations. </td>
diff --git a/docs/layouts/shortcodes/generated/postgres_sync_table.html 
b/docs/layouts/shortcodes/generated/postgres_sync_table.html
index 6bc2cb7b2c..b660d20b15 100644
--- a/docs/layouts/shortcodes/generated/postgres_sync_table.html
+++ b/docs/layouts/shortcodes/generated/postgres_sync_table.html
@@ -54,6 +54,10 @@ under the License.
             </ul>
         </td>
     </tr>
+    <tr>
+        <td><h5>--sync_primary_keys_from_source_schema</h5></td>
+        <td>This is used to specify if primary keys from source should be used 
in paimon schema if primary keys using --primary_keys are not specified. The 
default is true.</td>
+    </tr>
     <tr>
         <td><h5>--computed_column</h5></td>
         <td>The definitions of computed columns. The argument field is from 
PostgreSQL table field name. See <a 
href="../overview/#computed-functions">here</a> for a complete list of 
configurations. </td>
diff --git a/docs/layouts/shortcodes/generated/pulsar_sync_database.html 
b/docs/layouts/shortcodes/generated/pulsar_sync_database.html
index 58d95e88ed..9e87348362 100644
--- a/docs/layouts/shortcodes/generated/pulsar_sync_database.html
+++ b/docs/layouts/shortcodes/generated/pulsar_sync_database.html
@@ -77,8 +77,14 @@ under the License.
     <tr>
         <td><h5>--primary_keys</h5></td>
         <td>The primary keys for Paimon table. If there are multiple primary 
keys, connect them with comma, for example "buyer_id,seller_id".
-            If the keys are not in source table, but the source table has 
primary keys, the sink table will use source table's primary keys.
-            Otherwise, the sink table won't set primary keys.</td>
+            If the keys are not provided, but the source has primary keys, the 
sink table will use source's primary keys.
+            Otherwise, the sink table won't set primary keys.
+            If the keys are not provided, but the source has primary keys, and 
you don't want to use source's primary keys,
+            use --sync_primary_keys_from_source_schema.</td>
+    </tr>
+    <tr>
+        <td><h5>--sync_primary_keys_from_source_schema</h5></td>
+        <td>This is used to specify if primary keys from source should be used 
in paimon schema if primary keys using --primary_keys are not specified. The 
default is true.</td>
     </tr>
     <tr>
         <td><h5>--pulsar_conf</h5></td>
diff --git a/docs/layouts/shortcodes/generated/pulsar_sync_table.html 
b/docs/layouts/shortcodes/generated/pulsar_sync_table.html
index 4fc16910e0..30c0b4ae3c 100644
--- a/docs/layouts/shortcodes/generated/pulsar_sync_table.html
+++ b/docs/layouts/shortcodes/generated/pulsar_sync_table.html
@@ -61,6 +61,10 @@ under the License.
             </ul>
         </td>
     </tr>
+    <tr>
+        <td><h5>--sync_primary_keys_from_source_schema</h5></td>
+        <td>This is used to specify if primary keys from source should be used 
in paimon schema if primary keys using --primary_keys are not specified. The 
default is true.</td>
+    </tr>
     <tr>
         <td><h5>--computed_column</h5></td>
         <td>The definitions of computed columns. The argument field is from 
Pulsar topic's table field name. See <a 
href="../overview/#computed-functions">here</a> for a complete list of 
configurations. </td>
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index 46127c5f78..6a3001319c 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -70,6 +70,8 @@ public class CdcActionCommonUtils {
     public static final String METADATA_COLUMN = "metadata_column";
     public static final String MULTIPLE_TABLE_PARTITION_KEYS = 
"multiple_table_partition_keys";
     public static final String EAGER_INIT = "eager_init";
+    public static final String SYNC_PKEYS_FROM_SOURCE_SCHEMA =
+            "sync_primary_keys_from_source_schema";
 
     public static void assertSchemaCompatible(
             TableSchema paimonSchema, List<DataField> sourceTableFields) {
@@ -122,7 +124,8 @@ public class CdcActionCommonUtils {
             CdcMetadataConverter[] metadataConverters,
             boolean caseSensitive,
             boolean strictlyCheckSpecified,
-            boolean requirePrimaryKeys) {
+            boolean requirePrimaryKeys,
+            boolean syncPKeysFromSourceSchema) {
         Schema.Builder builder = Schema.newBuilder();
 
         // options
@@ -165,7 +168,8 @@ public class CdcActionCommonUtils {
                 sourceSchemaPrimaryKeys,
                 allFieldNames,
                 strictlyCheckSpecified,
-                requirePrimaryKeys);
+                requirePrimaryKeys,
+                syncPKeysFromSourceSchema);
 
         // partition keys
         specifiedPartitionKeys = listCaseConvert(specifiedPartitionKeys, 
caseSensitive);
@@ -185,7 +189,8 @@ public class CdcActionCommonUtils {
             List<String> sourceSchemaPrimaryKeys,
             List<String> allFieldNames,
             boolean strictlyCheckSpecified,
-            boolean requirePrimaryKeys) {
+            boolean requirePrimaryKeys,
+            boolean syncPKeysFromSourceSchema) {
         if (!specifiedPrimaryKeys.isEmpty()) {
             if (allFieldNames.containsAll(specifiedPrimaryKeys)) {
                 builder.primaryKey(specifiedPrimaryKeys);
@@ -205,12 +210,12 @@ public class CdcActionCommonUtils {
             }
         }
 
-        if (!sourceSchemaPrimaryKeys.isEmpty()) {
+        if (syncPKeysFromSourceSchema && !sourceSchemaPrimaryKeys.isEmpty()) {
             builder.primaryKey(sourceSchemaPrimaryKeys);
             return;
         }
 
-        if (requirePrimaryKeys) {
+        if (requirePrimaryKeys && syncPKeysFromSourceSchema) {
             throw new IllegalArgumentException(
                     "Failed to set specified primary keys for sink table "
                             + tableName
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
index 1e1671b4a3..6dbb2b489d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java
@@ -52,7 +52,6 @@ import java.util.Map;
  * </ul>
  */
 public abstract class MessageQueueSyncTableActionBase extends 
SyncTableActionBase {
-
     public MessageQueueSyncTableActionBase(
             String database,
             String table,
@@ -87,6 +86,7 @@ public abstract class MessageQueueSyncTableActionBase extends 
SyncTableActionBas
                 metadataConverters,
                 caseSensitive,
                 true,
-                false);
+                false,
+                this.syncPKeysFromSourceSchema);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 4ce4e7c250..36a9be033e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -202,6 +202,7 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
                         partitionKeys,
                         primaryKeys,
                         requirePrimaryKeys(),
+                        syncPKeysFromSourceSchema,
                         partitionKeyMultiple,
                         metadataConverters);
         Pattern tblIncludingPattern = Pattern.compile(includingTables);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
index 8d0b8b9cef..28134375f6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
@@ -34,6 +34,7 @@ import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.SYNC_PKEYS_FROM_SOURCE_SCHEMA;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_MAPPING;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX_DB;
@@ -86,5 +87,10 @@ public abstract class SyncDatabaseActionFactoryBase<T 
extends SyncDatabaseAction
             action.withComputedColumnArgs(
                     new 
ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN)));
         }
+
+        if (params.has(SYNC_PKEYS_FROM_SOURCE_SCHEMA)) {
+            action.syncPKeysFromSourceSchema(
+                    
Boolean.parseBoolean(params.get(SYNC_PKEYS_FROM_SOURCE_SCHEMA)));
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index e0de071d29..0f984d3654 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -107,7 +107,8 @@ public abstract class SyncTableActionBase extends 
SynchronizationActionBase {
                 metadataConverters,
                 caseSensitive,
                 true,
-                true);
+                true,
+                this.syncPKeysFromSourceSchema);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java
index 08b9131f71..cb9b678d19 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionFactoryBase.java
@@ -31,6 +31,7 @@ import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_C
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.SYNC_PKEYS_FROM_SOURCE_SCHEMA;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
 
 /** Base {@link ActionFactory} for synchronizing into one Paimon table. */
@@ -76,5 +77,10 @@ public abstract class SyncTableActionFactoryBase
             String[] options = params.get(TYPE_MAPPING).split(",");
             action.withTypeMapping(TypeMapping.parse(options));
         }
+
+        if (params.has(SYNC_PKEYS_FROM_SOURCE_SCHEMA)) {
+            boolean flag = 
Boolean.parseBoolean(params.get(SYNC_PKEYS_FROM_SOURCE_SCHEMA));
+            action.syncPKeysFromSourceSchema(flag);
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index 446ce26cb3..d4b2b15394 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -68,6 +68,9 @@ public abstract class SynchronizationActionBase extends 
ActionBase {
 
     protected Map<String, String> tableConfig = new HashMap<>();
     protected TypeMapping typeMapping = TypeMapping.defaultMapping();
+    // this is to specify if we should use primary keys from source
+    // in paimon schema if pkeys are not specified in action command
+    protected boolean syncPKeysFromSourceSchema = true;
     protected CdcMetadataConverter[] metadataConverters = new 
CdcMetadataConverter[] {};
 
     public SynchronizationActionBase(
@@ -102,6 +105,11 @@ public abstract class SynchronizationActionBase extends 
ActionBase {
         return this;
     }
 
+    public SynchronizationActionBase syncPKeysFromSourceSchema(boolean flag) {
+        this.syncPKeysFromSourceSchema = flag;
+        return this;
+    }
+
     @VisibleForTesting
     public Map<String, String> tableConfig() {
         return tableConfig;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 790467ba0d..14238fd818 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -154,7 +154,8 @@ public class MySqlSyncDatabaseAction extends 
SyncDatabaseActionBase {
                             metadataConverters,
                             caseSensitive,
                             false,
-                            true);
+                            true,
+                            this.syncPKeysFromSourceSchema);
             try {
                 table = (FileStoreTable) catalog.getTable(identifier);
                 Supplier<String> errMsg =
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
index 0d4a74ac2f..1435cfd5da 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
@@ -38,6 +38,7 @@ public class NewTableSchemaBuilder implements Serializable {
     private final List<String> partitionKeys;
     private final List<String> primaryKeys;
     private final boolean requirePrimaryKeys;
+    private final boolean syncPKeysFromSourceSchema;
     private final CdcMetadataConverter[] metadataConverters;
     private final Map<String, List<String>> partitionKeyMultiple;
 
@@ -47,6 +48,7 @@ public class NewTableSchemaBuilder implements Serializable {
             List<String> partitionKeys,
             List<String> primaryKeys,
             boolean requirePrimaryKeys,
+            boolean syncPKeysFromSourceSchema,
             Map<String, List<String>> partitionKeyMultiple,
             CdcMetadataConverter[] metadataConverters) {
         this.tableConfig = tableConfig;
@@ -55,6 +57,7 @@ public class NewTableSchemaBuilder implements Serializable {
         this.partitionKeys = partitionKeys;
         this.primaryKeys = primaryKeys;
         this.requirePrimaryKeys = requirePrimaryKeys;
+        this.syncPKeysFromSourceSchema = syncPKeysFromSourceSchema;
         this.partitionKeyMultiple = partitionKeyMultiple;
     }
 
@@ -80,6 +83,7 @@ public class NewTableSchemaBuilder implements Serializable {
                         metadataConverters,
                         caseSensitive,
                         false,
-                        requirePrimaryKeys));
+                        requirePrimaryKeys,
+                        syncPKeysFromSourceSchema));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
index 855623b1af..312c323e6a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
@@ -326,6 +326,7 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
         private final List<String> computedColumnArgs = new ArrayList<>();
         private final List<String> typeMappingModes = new ArrayList<>();
         private final List<String> metadataColumns = new ArrayList<>();
+        private boolean syncPKeysFromSourceSchema = true;
 
         public SyncTableActionBuilder(Class<T> clazz, Map<String, String> 
sourceConfig) {
             this.clazz = clazz;
@@ -371,6 +372,11 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
             return this;
         }
 
+        public SyncTableActionBuilder<T> syncPKeysFromSourceSchema(boolean 
flag) {
+            this.syncPKeysFromSourceSchema = flag;
+            return this;
+        }
+
         public T build() {
             List<String> args =
                     new ArrayList<>(
@@ -381,7 +387,9 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
                                     "--database",
                                     database,
                                     "--table",
-                                    tableName));
+                                    tableName,
+                                    "--sync_primary_keys_from_source_schema",
+                                    
String.valueOf(syncPKeysFromSourceSchema)));
 
             args.addAll(mapToArgs(getConfKey(clazz), sourceConfig));
             args.addAll(mapToArgs("--catalog-conf", catalogConfig));
@@ -393,6 +401,7 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
 
             args.addAll(listToMultiArgs("--computed-column", 
computedColumnArgs));
             args.addAll(listToMultiArgs("--metadata-column", metadataColumns));
+            args.add("--use_pkeys_from_source_for_paimon_schema");
 
             return createAction(clazz, args);
         }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 171b489d42..b48a1c79cf 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -1599,4 +1599,57 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
             waitForResult(expected, table, rowType, primaryKeys);
         }
     }
+
+    @Test
+    @Timeout(60)
+    public void testSyncPrimaryKeysFromSourceSchemaTrue() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", 
"check_sync_primary_keys_from_source_schema");
+        mySqlConfig.put("table-name", "t");
+
+        MySqlSyncTableAction action =
+                
syncTableActionBuilder(mySqlConfig).withTableConfig(getBasicTableConfig()).build();
+        runActionWithDefaultEnv(action);
+
+        FileStoreTable table = getFileStoreTable();
+        TableSchema schema = table.schema();
+        assertThat(schema.primaryKeys().isEmpty()).isEqualTo(false);
+        
assertThat(schema.primaryKeys()).isEqualTo(Collections.singletonList("k"));
+
+        List<String> expectedInsert = Arrays.asList("+I[1, Apache]", "+I[2, 
Paimon]");
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10)},
+                        new String[] {"k", "v1"});
+        waitForResult(expectedInsert, table, rowType, 
Collections.singletonList("k"));
+    }
+
+    @Test
+    @Timeout(60)
+    public void testSyncPrimaryKeysFromSourceSchemaFalse() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", 
"check_sync_primary_keys_from_source_schema");
+        mySqlConfig.put("table-name", "t");
+
+        Map<String, String> tableConfig = getBasicTableConfig();
+        tableConfig.put("bucket-key", "v1");
+
+        MySqlSyncTableAction action =
+                syncTableActionBuilder(mySqlConfig)
+                        .withTableConfig(tableConfig)
+                        .syncPKeysFromSourceSchema(false)
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        FileStoreTable table = getFileStoreTable();
+        TableSchema schema = table.schema();
+        assertThat(schema.primaryKeys().isEmpty()).isEqualTo(true);
+
+        List<String> expectedInsert = Arrays.asList("+I[1, Apache]", "+I[2, 
Paimon]");
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10)},
+                        new String[] {"k", "v1"});
+        waitForResult(expectedInsert, table, rowType, Collections.emptyList());
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index 66e0b776d0..ae0186cf70 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -456,4 +456,15 @@ USE check_cdc_sync_runtime_execution_mode;
 CREATE TABLE t (
                    k INT PRIMARY KEY,
                    v1 VARCHAR(10)
-);
\ No newline at end of file
+);
+
+-- 
################################################################################
+--  testSyncPrimaryKeysFromSourceSchema{True/False}
+-- 
################################################################################
+CREATE DATABASE check_sync_primary_keys_from_source_schema;
+USE check_sync_primary_keys_from_source_schema;
+CREATE TABLE t (
+    k INT PRIMARY KEY,
+    v1 VARCHAR(10)
+);
+INSERT INTO t VALUES (1, 'Apache'), (2, 'Paimon');

Reply via email to