This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3237e1ab1 [cdc] Fix cdc job mistakenly changes immutable options of
existing table (#3095)
3237e1ab1 is described below
commit 3237e1ab198b4101b547179f0c0ddd21974ec936
Author: Kerwin <[email protected]>
AuthorDate: Wed Mar 27 13:44:33 2024 +0800
[cdc] Fix cdc job mistakenly changes immutable options of existing table
(#3095)
---
.../action/cdc/SynchronizationActionBase.java | 23 +++++++++---
.../cdc/mysql/MySqlSyncTableActionITCase.java | 43 ++++++++++++++++++++++
.../src/test/resources/mysql/sync_table_setup.sql | 9 ++++-
3 files changed, 69 insertions(+), 6 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index 684408f24..5c04d5707 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -44,6 +44,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.TagCreationMode.WATERMARK;
@@ -178,11 +179,23 @@ public abstract class SynchronizationActionBase extends
ActionBase {
protected FileStoreTable alterTableOptions(Identifier identifier,
FileStoreTable table) {
// doesn't support altering bucket here
- Map<String, String> withoutBucket = new HashMap<>(tableConfig);
- withoutBucket.remove(CoreOptions.BUCKET.key());
-
+ Map<String, String> dynamicOptions = new HashMap<>(tableConfig);
+ dynamicOptions.remove(CoreOptions.BUCKET.key());
+
+ // remove immutable options and options with equal values
+ Map<String, String> oldOptions = table.options();
+ Set<String> immutableOptionKeys = CoreOptions.getImmutableOptionKeys();
+ dynamicOptions
+ .entrySet()
+ .removeIf(
+ entry ->
+ immutableOptionKeys.contains(entry.getKey())
+ || Objects.equals(
+
oldOptions.get(entry.getKey()), entry.getValue()));
+
+ // alter the table dynamic options
List<SchemaChange> optionChanges =
- withoutBucket.entrySet().stream()
+ dynamicOptions.entrySet().stream()
.map(entry -> SchemaChange.setOption(entry.getKey(),
entry.getValue()))
.collect(Collectors.toList());
@@ -194,7 +207,7 @@ public abstract class SynchronizationActionBase extends
ActionBase {
throw new RuntimeException("This is unexpected.", e);
}
- return table.copy(withoutBucket);
+ return table.copy(dynamicOptions);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 2ed3b6f46..fb54763cb 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -1097,6 +1097,49 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
assertThat(table.options()).containsAllEntriesOf(tableConfig);
}
+ @Test
+ public void testOptionsChangeInExistingTable() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put("bucket", "1");
+ options.put("sink.parallelism", "1");
+ options.put("sequence.field", "_timestamp");
+
+ createFileStoreTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(), DataTypes.DATE(),
DataTypes.TIMESTAMP(0)
+ },
+ new String[] {"pk", "_date", "_timestamp"}),
+ Collections.emptyList(),
+ Collections.singletonList("pk"),
+ options);
+
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "test_exist_options_change");
+ Map<String, String> tableConfig = new HashMap<>();
+ // update immutable options
+ tableConfig.put("sequence.field", "_date");
+ // update existing options
+ tableConfig.put("sink.parallelism", "2");
+ // add new options
+ tableConfig.put("snapshot.expire.limit", "1000");
+
+ MySqlSyncTableAction action =
+ syncTableActionBuilder(mySqlConfig)
+ .withPrimaryKeys("pk")
+ .withTableConfig(tableConfig)
+ .build();
+ runActionWithDefaultEnv(action);
+
+ FileStoreTable table = getFileStoreTable();
+
+ assertThat(table.options().get("bucket")).isEqualTo("1");
+
assertThat(table.options().get("sequence.field")).isEqualTo("_timestamp");
+ assertThat(table.options().get("sink.parallelism")).isEqualTo("2");
+
assertThat(table.options().get("snapshot.expire.limit")).isEqualTo("1000");
+ }
+
@Test
@Timeout(60)
public void testMetadataColumns() throws Exception {
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index 949f1c99d..b69661b60 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -309,6 +309,13 @@ CREATE TABLE test_options_change (
PRIMARY KEY (pk)
);
+CREATE TABLE test_exist_options_change (
+ pk INT,
+ _date DATE,
+ _timestamp TIMESTAMP,
+ PRIMARY KEY (pk)
+);
+
--
################################################################################
-- testSyncShard
--
################################################################################
@@ -405,4 +412,4 @@ USE invalid_alter_bucket;
CREATE TABLE t (
k INT PRIMARY KEY
-);
\ No newline at end of file
+);