This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d31aaf0fe [core] Forbid to change bucket number through dynamic 
options (#2839)
d31aaf0fe is described below

commit d31aaf0fe6bb2d0b7b3b8adfccaee28c46c4ac81
Author: yuzelin <[email protected]>
AuthorDate: Fri Mar 1 12:04:47 2024 +0800

    [core] Forbid to change bucket number through dynamic options (#2839)
---
 .../paimon/table/AbstractFileStoreTable.java       |  5 +++++
 .../flink/action/cdc/SyncTableActionBase.java      |  3 ++-
 .../action/cdc/SynchronizationActionBase.java      |  7 +++++++
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |  2 +-
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 24 ++++++++++++++++++++++
 .../src/test/resources/mysql/sync_table_setup.sql  | 11 ++++++++++
 6 files changed, 50 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 6eea9d2ae..bdc81f3c1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -199,6 +199,11 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 (k, v) -> {
                     if (!Objects.equals(v, options.get(k))) {
                         SchemaManager.checkAlterTableOption(k);
+
+                        if (CoreOptions.BUCKET.key().equals(k)) {
+                            throw new UnsupportedOperationException(
+                                    "Cannot change bucket number through 
dynamic options. You might need to rescale bucket.");
+                        }
                     }
                 });
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index a5aab32d0..ae9301362 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -122,7 +122,8 @@ public abstract class SyncTableActionBase extends 
SynchronizationActionBase {
         Identifier identifier = new Identifier(database, table);
         // Check if table exists before trying to get or create it
         if (catalog.tableExists(identifier)) {
-            fileStoreTable = (FileStoreTable) 
catalog.getTable(identifier).copy(tableConfig);
+            fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
+            fileStoreTable = copyOptionsWithoutBucket(fileStoreTable);
             try {
                 Schema retrievedSchema = retrieveSchema();
                 computedColumns =
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index 826213de8..147803c35 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -26,6 +26,7 @@ import 
org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -171,6 +172,12 @@ public abstract class SynchronizationActionBase extends 
ActionBase {
             DataStream<RichCdcMultiplexRecord> input,
             EventParser.Factory<RichCdcMultiplexRecord> parserFactory);
 
+    protected FileStoreTable copyOptionsWithoutBucket(FileStoreTable table) {
+        Map<String, String> toCopy = new HashMap<>(tableConfig);
+        toCopy.remove(CoreOptions.BUCKET.key());
+        return table.copy(toCopy);
+    }
+
     @Override
     public void run() throws Exception {
         build();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 7e2b39ae6..70edca7e8 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -155,7 +155,7 @@ public class MySqlSyncDatabaseAction extends 
SyncDatabaseActionBase {
                             true);
             try {
                 table = (FileStoreTable) catalog.getTable(identifier);
-                table = table.copy(tableConfig);
+                table = copyOptionsWithoutBucket(table);
                 Supplier<String> errMsg =
                         incompatibleMessage(table.schema(), tableInfo, 
identifier);
                 if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 46726ae0d..a1afcec93 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -48,8 +48,10 @@ import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.BUCKET;
 import static 
org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** IT cases for {@link MySqlSyncTableAction}. */
@@ -1075,4 +1077,26 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         assertThat(table.primaryKeys()).containsExactly("id1", "part");
         assertThat(table.partitionKeys()).containsExactly("part");
     }
+
+    @Test
+    public void testInvalidAlterBucket() throws Exception {
+        // create table with bucket first
+        createFileStoreTable(
+                RowType.of(new DataType[] {DataTypes.INT()}, new String[] 
{"k"}),
+                Collections.emptyList(),
+                Collections.singletonList("k"),
+                Collections.singletonMap(BUCKET.key(), "1"));
+
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", "invalid_alter_bucket");
+        mySqlConfig.put("table-name", "t");
+
+        MySqlSyncTableAction action =
+                syncTableActionBuilder(mySqlConfig)
+                        
.withTableConfig(Collections.singletonMap(BUCKET.key(), "2"))
+                        .build();
+
+        assertThatCode(action::build).doesNotThrowAnyException();
+        
assertThat(action.fileStoreTable().options().get(BUCKET.key())).isEqualTo("1");
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index 814ba8f2e..d119b1c89 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -374,4 +374,15 @@ CREATE TABLE t (
     ID1 INT,
     PART INT,
     PRIMARY KEY (ID0)
+);
+
+-- 
################################################################################
+--  testInvalidAlterBucket
+-- 
################################################################################
+
+CREATE DATABASE invalid_alter_bucket;
+USE invalid_alter_bucket;
+
+CREATE TABLE t (
+    k INT PRIMARY KEY
 );
\ No newline at end of file

Reply via email to