This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3237e1ab1 [cdc] Fix cdc job mistakenly changes immutable options of 
existing table (#3095)
3237e1ab1 is described below

commit 3237e1ab198b4101b547179f0c0ddd21974ec936
Author: Kerwin <[email protected]>
AuthorDate: Wed Mar 27 13:44:33 2024 +0800

    [cdc] Fix cdc job mistakenly changes immutable options of existing table 
(#3095)
---
 .../action/cdc/SynchronizationActionBase.java      | 23 +++++++++---
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 43 ++++++++++++++++++++++
 .../src/test/resources/mysql/sync_table_setup.sql  |  9 ++++-
 3 files changed, 69 insertions(+), 6 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index 684408f24..5c04d5707 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -44,6 +44,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.TagCreationMode.WATERMARK;
@@ -178,11 +179,23 @@ public abstract class SynchronizationActionBase extends 
ActionBase {
 
     protected FileStoreTable alterTableOptions(Identifier identifier, 
FileStoreTable table) {
         // doesn't support altering bucket here
-        Map<String, String> withoutBucket = new HashMap<>(tableConfig);
-        withoutBucket.remove(CoreOptions.BUCKET.key());
-
+        Map<String, String> dynamicOptions = new HashMap<>(tableConfig);
+        dynamicOptions.remove(CoreOptions.BUCKET.key());
+
+        // remove immutable options and options with equal values
+        Map<String, String> oldOptions = table.options();
+        Set<String> immutableOptionKeys = CoreOptions.getImmutableOptionKeys();
+        dynamicOptions
+                .entrySet()
+                .removeIf(
+                        entry ->
+                                immutableOptionKeys.contains(entry.getKey())
+                                        || Objects.equals(
+                                                
oldOptions.get(entry.getKey()), entry.getValue()));
+
+        // alter the table dynamic options
         List<SchemaChange> optionChanges =
-                withoutBucket.entrySet().stream()
+                dynamicOptions.entrySet().stream()
                         .map(entry -> SchemaChange.setOption(entry.getKey(), 
entry.getValue()))
                         .collect(Collectors.toList());
 
@@ -194,7 +207,7 @@ public abstract class SynchronizationActionBase extends 
ActionBase {
             throw new RuntimeException("This is unexpected.", e);
         }
 
-        return table.copy(withoutBucket);
+        return table.copy(dynamicOptions);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 2ed3b6f46..fb54763cb 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -1097,6 +1097,49 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         assertThat(table.options()).containsAllEntriesOf(tableConfig);
     }
 
+    @Test
+    public void testOptionsChangeInExistingTable() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put("bucket", "1");
+        options.put("sink.parallelism", "1");
+        options.put("sequence.field", "_timestamp");
+
+        createFileStoreTable(
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.DATE(), 
DataTypes.TIMESTAMP(0)
+                        },
+                        new String[] {"pk", "_date", "_timestamp"}),
+                Collections.emptyList(),
+                Collections.singletonList("pk"),
+                options);
+
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME);
+        mySqlConfig.put("table-name", "test_exist_options_change");
+        Map<String, String> tableConfig = new HashMap<>();
+        // update immutable options
+        tableConfig.put("sequence.field", "_date");
+        // update existing options
+        tableConfig.put("sink.parallelism", "2");
+        // add new options
+        tableConfig.put("snapshot.expire.limit", "1000");
+
+        MySqlSyncTableAction action =
+                syncTableActionBuilder(mySqlConfig)
+                        .withPrimaryKeys("pk")
+                        .withTableConfig(tableConfig)
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        FileStoreTable table = getFileStoreTable();
+
+        assertThat(table.options().get("bucket")).isEqualTo("1");
+        
assertThat(table.options().get("sequence.field")).isEqualTo("_timestamp");
+        assertThat(table.options().get("sink.parallelism")).isEqualTo("2");
+        
assertThat(table.options().get("snapshot.expire.limit")).isEqualTo("1000");
+    }
+
     @Test
     @Timeout(60)
     public void testMetadataColumns() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index 949f1c99d..b69661b60 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -309,6 +309,13 @@ CREATE TABLE test_options_change (
    PRIMARY KEY (pk)
 );
 
+CREATE TABLE test_exist_options_change (
+    pk INT,
+    _date DATE,
+    _timestamp TIMESTAMP,
+    PRIMARY KEY (pk)
+);
+
 -- 
################################################################################
 --  testSyncShard
 -- 
################################################################################
@@ -405,4 +412,4 @@ USE invalid_alter_bucket;
 
 CREATE TABLE t (
     k INT PRIMARY KEY
-);
\ No newline at end of file
+);

Reply via email to