This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d31aaf0fe [core] Forbid to change bucket number through dynamic
options (#2839)
d31aaf0fe is described below
commit d31aaf0fe6bb2d0b7b3b8adfccaee28c46c4ac81
Author: yuzelin <[email protected]>
AuthorDate: Fri Mar 1 12:04:47 2024 +0800
[core] Forbid to change bucket number through dynamic options (#2839)
---
.../paimon/table/AbstractFileStoreTable.java | 5 +++++
.../flink/action/cdc/SyncTableActionBase.java | 3 ++-
.../action/cdc/SynchronizationActionBase.java | 7 +++++++
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 2 +-
.../cdc/mysql/MySqlSyncTableActionITCase.java | 24 ++++++++++++++++++++++
.../src/test/resources/mysql/sync_table_setup.sql | 11 ++++++++++
6 files changed, 50 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 6eea9d2ae..bdc81f3c1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -199,6 +199,11 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
(k, v) -> {
if (!Objects.equals(v, options.get(k))) {
SchemaManager.checkAlterTableOption(k);
+
+ if (CoreOptions.BUCKET.key().equals(k)) {
+ throw new UnsupportedOperationException(
+ "Cannot change bucket number through
dynamic options. You might need to rescale bucket.");
+ }
}
});
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index a5aab32d0..ae9301362 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -122,7 +122,8 @@ public abstract class SyncTableActionBase extends
SynchronizationActionBase {
Identifier identifier = new Identifier(database, table);
// Check if table exists before trying to get or create it
if (catalog.tableExists(identifier)) {
- fileStoreTable = (FileStoreTable)
catalog.getTable(identifier).copy(tableConfig);
+ fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
+ fileStoreTable = copyOptionsWithoutBucket(fileStoreTable);
try {
Schema retrievedSchema = retrieveSchema();
computedColumns =
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index 826213de8..147803c35 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -26,6 +26,7 @@ import
org.apache.paimon.flink.action.cdc.watermark.CdcWatermarkStrategy;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -171,6 +172,12 @@ public abstract class SynchronizationActionBase extends
ActionBase {
DataStream<RichCdcMultiplexRecord> input,
EventParser.Factory<RichCdcMultiplexRecord> parserFactory);
+ protected FileStoreTable copyOptionsWithoutBucket(FileStoreTable table) {
+ Map<String, String> toCopy = new HashMap<>(tableConfig);
+ toCopy.remove(CoreOptions.BUCKET.key());
+ return table.copy(toCopy);
+ }
+
@Override
public void run() throws Exception {
build();
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 7e2b39ae6..70edca7e8 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -155,7 +155,7 @@ public class MySqlSyncDatabaseAction extends
SyncDatabaseActionBase {
true);
try {
table = (FileStoreTable) catalog.getTable(identifier);
- table = table.copy(tableConfig);
+ table = copyOptionsWithoutBucket(table);
Supplier<String> errMsg =
incompatibleMessage(table.schema(), tableInfo,
identifier);
if (shouldMonitorTable(table.schema(), fromMySql, errMsg)) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 46726ae0d..a1afcec93 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -48,8 +48,10 @@ import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.BUCKET;
import static
org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** IT cases for {@link MySqlSyncTableAction}. */
@@ -1075,4 +1077,26 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
assertThat(table.primaryKeys()).containsExactly("id1", "part");
assertThat(table.partitionKeys()).containsExactly("part");
}
+
+ @Test
+ public void testInvalidAlterBucket() throws Exception {
+ // create table with bucket first
+ createFileStoreTable(
+ RowType.of(new DataType[] {DataTypes.INT()}, new String[]
{"k"}),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ Collections.singletonMap(BUCKET.key(), "1"));
+
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", "invalid_alter_bucket");
+ mySqlConfig.put("table-name", "t");
+
+ MySqlSyncTableAction action =
+ syncTableActionBuilder(mySqlConfig)
+
.withTableConfig(Collections.singletonMap(BUCKET.key(), "2"))
+ .build();
+
+ assertThatCode(action::build).doesNotThrowAnyException();
+
assertThat(action.fileStoreTable().options().get(BUCKET.key())).isEqualTo("1");
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index 814ba8f2e..d119b1c89 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -374,4 +374,15 @@ CREATE TABLE t (
ID1 INT,
PART INT,
PRIMARY KEY (ID0)
+);
+
+--
################################################################################
+-- testInvalidAlterBucket
+--
################################################################################
+
+CREATE DATABASE invalid_alter_bucket;
+USE invalid_alter_bucket;
+
+CREATE TABLE t (
+ k INT PRIMARY KEY
);
\ No newline at end of file