This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new adc70cef [FLINK-28035] Support rescale overwrite
adc70cef is described below

commit adc70cef2da6b1750fd19b5ef70e7e06e490a4fa
Author: Jane Chan <[email protected]>
AuthorDate: Tue Jun 21 11:04:40 2022 +0800

    [FLINK-28035] Support rescale overwrite
    
    This closes #157
---
 .../store/connector/TableStoreFactoryOptions.java  |  12 --
 .../store/connector/AlterTableCompactITCase.java   |  31 +--
 .../store/connector/FileStoreTableITCase.java      |  80 ++++++-
 .../store/connector/ReadWriteTableITCase.java      |  49 +++--
 .../table/store/connector/RescaleBucketITCase.java | 240 +++++++++++++++++++++
 .../table/store/file/AppendOnlyFileStore.java      |  21 +-
 .../flink/table/store/file/KeyValueFileStore.java  |  21 +-
 .../file/operation/AbstractFileStoreScan.java      |  57 +++--
 .../file/operation/AppendOnlyFileStoreScan.java    |   6 +-
 .../file/operation/KeyValueFileStoreScan.java      |   6 +-
 10 files changed, 426 insertions(+), 97 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
index 18d04a68..38d6a209 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
@@ -35,17 +35,6 @@ public class TableStoreFactoryOptions {
                     .noDefaultValue()
                     .withDescription("The root file path of the table store in 
the filesystem.");
 
-    public static final ConfigOption<Boolean> COMPACTION_RESCALE_BUCKET =
-            ConfigOptions.key("compaction.rescale-bucket")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            "Specify the behavior for compaction. Set value to 
true "
-                                    + "will lead compaction to reorganize data 
files "
-                                    + "according to the bucket number from 
table schema. "
-                                    + "By default, compaction does not adjust 
the bucket number "
-                                    + "of a partition/table.");
-
     @Internal
     public static final ConfigOption<Boolean> COMPACTION_MANUAL_TRIGGERED =
             ConfigOptions.key("compaction.manual-triggered")
@@ -81,7 +70,6 @@ public class TableStoreFactoryOptions {
 
     public static Set<ConfigOption<?>> allOptions() {
         Set<ConfigOption<?>> allOptions = new HashSet<>();
-        allOptions.add(COMPACTION_RESCALE_BUCKET);
         allOptions.add(LOG_SYSTEM);
         allOptions.add(SINK_PARALLELISM);
         allOptions.add(SCAN_PARALLELISM);
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
index 0e4d8358..a3b9726b 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
@@ -18,15 +18,12 @@
 
 package org.apache.flink.table.store.connector;
 
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 
@@ -41,7 +38,6 @@ import java.util.Random;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
-import static 
org.apache.flink.table.store.file.FileStoreOptions.relativeTablePath;
 import static 
org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
 import static 
org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED;
 import static 
org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.SINGLE_PARTITIONED;
@@ -107,14 +103,14 @@ public class AlterTableCompactITCase extends 
FileStoreTableITCase {
         batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3, 
3), " + "(4, 4, 4)");
         batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3, 
3), " + "(4, 4, 4)");
         batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3, 
3), " + "(4, 4, 4)");
-        Snapshot snapshot = findLatestSnapshot("T0");
+        Snapshot snapshot = findLatestSnapshot("T0", true);
         assertThat(snapshot.id()).isEqualTo(6);
         
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
 
         // decrease trigger
         batchSql("ALTER TABLE T0 SET ('num-sorted-run.compaction-trigger' = 
'1')");
         batchSql("ALTER TABLE T0 COMPACT");
-        assertThat(findLatestSnapshot("T0"))
+        assertThat(findLatestSnapshot("T0", true))
                 .usingComparator(Comparator.comparing(Snapshot::id))
                 .isEqualTo(snapshot);
     }
@@ -143,7 +139,7 @@ public class AlterTableCompactITCase extends 
FileStoreTableITCase {
                                     .map(kv -> kvAsString(kv, mode))
                                     .collect(Collectors.joining(",\n")));
             batchSql(insertQuery);
-            Snapshot snapshot = findLatestSnapshot(tableName);
+            Snapshot snapshot = findLatestSnapshot(tableName, true);
             
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
             latestSnapshot = snapshot.id();
             dataset.addAll(data);
@@ -185,7 +181,7 @@ public class AlterTableCompactITCase extends 
FileStoreTableITCase {
                                 .filter(kv -> partFilter(kv, part, mode))
                                 .map(kv -> convertToRow(kv, mode))
                                 .collect(Collectors.toList()));
-                latestSnapshot = findLatestSnapshot(tableName).id();
+                latestSnapshot = findLatestSnapshot(tableName, true).id();
             }
         }
     }
@@ -197,12 +193,12 @@ public class AlterTableCompactITCase extends 
FileStoreTableITCase {
             long latestSnapshot,
             List<Row> expectedData) {
         batchSql(compactQuery);
-        Snapshot snapshot = findLatestSnapshot(tableName);
+        Snapshot snapshot = findLatestSnapshot(tableName, true);
         assertThat(snapshot.id()).isEqualTo(latestSnapshot + 1);
         
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
         // check idempotence
         batchSql(compactQuery);
-        
assertThat(findLatestSnapshot(tableName).id()).isEqualTo(snapshot.id());
+        assertThat(findLatestSnapshot(tableName, 
true).id()).isEqualTo(snapshot.id());
 
         // read data
         List<Row> readData = batchSql(selectQuery);
@@ -296,19 +292,4 @@ public class AlterTableCompactITCase extends 
FileStoreTableITCase {
                 throw new UnsupportedOperationException("unsupported mode");
         }
     }
-
-    private Path getTableDirectory(String tableName) {
-        return new Path(
-                path
-                        + relativeTablePath(
-                                ObjectIdentifier.of(
-                                        bEnv.getCurrentCatalog(),
-                                        bEnv.getCurrentDatabase(),
-                                        tableName)));
-    }
-
-    private Snapshot findLatestSnapshot(String tableName) {
-        SnapshotManager snapshotManager = new 
SnapshotManager(getTableDirectory(tableName));
-        return snapshotManager.snapshot(snapshotManager.latestSnapshotId());
-    }
 }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
index 002480fd..4e6e8c59 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
@@ -19,10 +19,23 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
@@ -31,6 +44,8 @@ import 
org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
 
 import org.junit.Before;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.time.Duration;
 import java.util.List;
@@ -38,6 +53,8 @@ import java.util.List;
 import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
 import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
 import static 
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static 
org.apache.flink.table.store.file.FileStoreOptions.relativeTablePath;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /** ITCase for file store table api. */
 public abstract class FileStoreTableITCase extends AbstractTestBase {
@@ -52,15 +69,50 @@ public abstract class FileStoreTableITCase extends 
AbstractTestBase {
         sEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
         sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(100));
         path = TEMPORARY_FOLDER.newFolder().toURI().toString();
-        prepareEnv(bEnv, path);
-        prepareEnv(sEnv, path);
+        prepareConfiguration(bEnv, path);
+        prepareConfiguration(sEnv, path);
+        prepareEnv();
     }
 
-    private void prepareEnv(TableEnvironment env, String path) {
+    private void prepareConfiguration(TableEnvironment env, String path) {
         Configuration config = env.getConfig().getConfiguration();
         
config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
         config.setString(TABLE_STORE_PREFIX + ROOT_PATH.key(), path);
-        ddl().forEach(env::executeSql);
+    }
+
+    private void prepareEnv() {
+        Parser parser = ((TableEnvironmentImpl) sEnv).getParser();
+        for (String ddl : ddl()) {
+            sEnv.executeSql(ddl);
+            List<Operation> operations = parser.parse(ddl);
+            if (operations.size() == 1) {
+                Operation operation = operations.get(0);
+                if (operation instanceof CreateCatalogOperation) {
+                    String name = ((CreateCatalogOperation) 
operation).getCatalogName();
+                    bEnv.registerCatalog(name, 
sEnv.getCatalog(name).orElse(null));
+                } else if (operation instanceof CreateTableOperation) {
+                    ObjectIdentifier tableIdentifier =
+                            ((CreateTableOperation) 
operation).getTableIdentifier();
+                    try {
+                        CatalogBaseTable table =
+                                
sEnv.getCatalog(tableIdentifier.getCatalogName())
+                                        .get()
+                                        
.getTable(tableIdentifier.toObjectPath());
+                        ((TableEnvironmentImpl) bEnv)
+                                .getCatalogManager()
+                                .getCatalog(tableIdentifier.getCatalogName())
+                                .get()
+                                .createTable(tableIdentifier.toObjectPath(), 
table, true);
+                    } catch (TableNotExistException
+                            | TableAlreadyExistException
+                            | DatabaseNotExistException e) {
+                        fail("This should not happen");
+                    }
+                } else {
+                    bEnv.executeSql(ddl);
+                }
+            }
+        }
     }
 
     protected abstract List<String> ddl();
@@ -78,4 +130,24 @@ public abstract class FileStoreTableITCase extends 
AbstractTestBase {
             throw new RuntimeException("Failed to collect the table result.", 
e);
         }
     }
+
+    protected Path getTableDirectory(String tableName, boolean managedTable) {
+        return new Path(
+                path
+                        + (managedTable
+                                ? relativeTablePath(
+                                        ObjectIdentifier.of(
+                                                bEnv.getCurrentCatalog(),
+                                                bEnv.getCurrentDatabase(),
+                                                tableName))
+                                : String.format("%s.db/%s", 
bEnv.getCurrentDatabase(), tableName)));
+    }
+
+    @Nullable
+    protected Snapshot findLatestSnapshot(String tableName, boolean 
managedTable) {
+        SnapshotManager snapshotManager =
+                new SnapshotManager(getTableDirectory(tableName, 
managedTable));
+        Long id = snapshotManager.latestSnapshotId();
+        return id == null ? null : snapshotManager.snapshot(id);
+    }
 }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 97da71f6..10adddfb 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
@@ -1419,36 +1420,40 @@ public class ReadWriteTableITCase extends 
ReadWriteTableTestBase {
                 String.format(
                         "CREATE TABLE IF NOT EXISTS rates (\n"
                                 + "currency STRING,\n"
-                                + " rate BIGINT\n"
-                                + ") WITH (\n"
+                                + " rate BIGINT,\n"
+                                + " dt STRING\n"
+                                + ") PARTITIONED BY (dt)\n"
+                                + "WITH (\n"
                                 + " 'bucket' = '2',\n"
                                 + " 'root-path' = '%s'\n"
                                 + ")",
                         rootPath));
-        tEnv.executeSql("INSERT INTO rates VALUES('US Dollar', 102)").await();
+        tEnv.executeSql("INSERT INTO rates VALUES('US Dollar', 102, 
'2022-06-20')").await();
 
         // increase bucket num from 2 to 3
-        tEnv.executeSql("ALTER TABLE rates SET ('bucket' = '3')");
-        assertThatThrownBy(
-                        () -> tEnv.executeSql("INSERT INTO rates VALUES('US 
Dollar', 102)").await())
-                .hasRootCauseInstanceOf(IllegalStateException.class)
-                .hasRootCauseMessage(
-                        "Bucket number has been changed. Manifest might be 
corrupted.");
-        assertThatThrownBy(() -> tEnv.executeSql("SELECT * FROM 
rates").await())
-                .hasRootCauseInstanceOf(IllegalStateException.class)
-                .hasRootCauseMessage(
-                        "Bucket number has been changed. Manifest might be 
corrupted.");
+        assertChangeBucketWithoutRescale(3);
 
         // decrease bucket num from 3 to 1
-        // TODO this test cannot work until alter table callback is 
implemented for managed table
-        /*
-        tEnv.executeSql("ALTER TABLE rates RESET ('bucket')");
-        assertThatThrownBy(() -> tEnv.executeSql("SELECT * FROM 
rates").await())
-                .hasRootCauseInstanceOf(IllegalStateException.class)
-                .hasRootCauseMessage(
-                        "Bucket number has been changed. Manifest might be 
corrupted.");
-
-         */
+        assertChangeBucketWithoutRescale(1);
+    }
+
+    private void assertChangeBucketWithoutRescale(int bucketNum) throws 
Exception {
+        tEnv.executeSql(String.format("ALTER TABLE rates SET ('bucket' = 
'%d')", bucketNum));
+        // read is ok
+        assertThat(BlockingIterator.of(tEnv.executeSql("SELECT * FROM 
rates").collect()).collect())
+                .containsExactlyInAnyOrder(changelogRow("+I", "US Dollar", 
102L, "2022-06-20"));
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                                "INSERT INTO rates VALUES('US 
Dollar', 102, '2022-06-20')")
+                                        .await())
+                .getRootCause()
+                .isInstanceOf(TableException.class)
+                .hasMessage(
+                        String.format(
+                                "Try to write partition {dt=2022-06-20} with a 
new bucket num %d, but the previous bucket num is 2. "
+                                        + "Please switch to batch mode, and 
perform INSERT OVERWRITE to rescale current data layout first.",
+                                bucketNum));
     }
 
     @Test
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
new file mode 100644
index 00000000..ce3f1677
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT case for overwrite data layout after changing num of bucket. */
+public class RescaleBucketITCase extends FileStoreTableITCase {
+
+    private final String alterTableSql = "ALTER TABLE %s SET ('bucket' = 
'%d')";
+
+    private final String rescaleOverwriteSql = "INSERT OVERWRITE %s SELECT * 
FROM %s";
+
+    @Override
+    protected List<String> ddl() {
+        return Arrays.asList(
+                "CREATE TABLE IF NOT EXISTS 
`default_catalog`.`default_database`.`T0` (f0 INT) WITH ('bucket' = '2')",
+                String.format(
+                        "CREATE CATALOG `fs_catalog` WITH ('type' = 
'table-store', 'warehouse' = '%s')",
+                        path),
+                "CREATE TABLE IF NOT EXISTS `fs_catalog`.`default`.`T1` (f0 
INT) WITH ('bucket' = '2')");
+    }
+
+    @Test
+    public void testRescaleManagedTable() {
+        innerTest("default_catalog", "T0", true);
+    }
+
+    @Test
+    public void testRescaleCatalogTable() {
+        innerTest("fs_catalog", "T1", false);
+    }
+
+    @Test
+    public void testSuspendAndRecoverAfterRescaleOverwrite() throws Exception {
+        sEnv.executeSql(
+                "CREATE TABLE IF NOT EXISTS 
`default_catalog`.`default_database`.`S0` (f0 INT) WITH ('connector' = 
'datagen')");
+        // register a companion table T4 for T3
+        executeBoth(
+                Arrays.asList(
+                        "USE CATALOG fs_catalog",
+                        "CREATE TABLE IF NOT EXISTS `T3` (f0 INT) WITH 
('bucket' = '2')",
+                        "CREATE TABLE IF NOT EXISTS `T4` (f0 INT)"));
+        SchemaManager schemaManager = new 
SchemaManager(getTableDirectory("T3", false));
+        assertLatestSchema(schemaManager, 0L, 2);
+
+        String streamSql =
+                "EXECUTE STATEMENT SET BEGIN\n "
+                        + "INSERT INTO `T3` SELECT * FROM 
`default_catalog`.`default_database`.`S0`;\n "
+                        + "INSERT INTO `T4` SELECT * FROM 
`default_catalog`.`default_database`.`S0`;\n"
+                        + "END";
+
+        
sEnv.getConfig().getConfiguration().set(SavepointConfigOptions.SAVEPOINT_PATH, 
path);
+        ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+
+        // step1: run streaming insert
+        JobID jobId = startJobAndAssertStatusTransition(client, streamSql, 
null);
+
+        // step2: stop with savepoint
+        stopJobAndAssertStatusTransition(client, jobId);
+
+        final Snapshot snapshotBeforeRescale = findLatestSnapshot("T3", false);
+        assertThat(snapshotBeforeRescale).isNotNull();
+        assertSnapshotSchema(schemaManager, snapshotBeforeRescale.schemaId(), 
0L, 2);
+        List<Row> committedData = batchSql("SELECT * FROM T3");
+
+        // step3: check bucket num
+        batchSql(alterTableSql, "T3", 4);
+        assertLatestSchema(schemaManager, 1L, 4);
+
+        // step4: rescale data layout according to the new bucket num
+        batchSql(rescaleOverwriteSql, "T3", "T3");
+        Snapshot snapshotAfterRescale = findLatestSnapshot("T3", false);
+        assertThat(snapshotAfterRescale).isNotNull();
+        
assertThat(snapshotAfterRescale.id()).isEqualTo(snapshotBeforeRescale.id() + 1);
+        
assertThat(snapshotAfterRescale.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
+        assertSnapshotSchema(schemaManager, snapshotAfterRescale.schemaId(), 
1L, 4);
+        assertThat(batchSql("SELECT * FROM 
T3")).containsExactlyInAnyOrderElementsOf(committedData);
+
+        // step5: resume streaming job
+        JobID resumedJobId =
+                startJobAndAssertStatusTransition(client, streamSql, 
snapshotAfterRescale.id());
+        // stop job
+        stopJobAndAssertStatusTransition(client, resumedJobId);
+
+        // check snapshot and schema
+        Snapshot lastSnapshot = findLatestSnapshot("T3", false);
+        assertThat(lastSnapshot).isNotNull();
+        SnapshotManager snapshotManager = new 
SnapshotManager(getTableDirectory("T3", false));
+        for (long snapshotId = lastSnapshot.id();
+                snapshotId > snapshotAfterRescale.id();
+                snapshotId--) {
+            assertSnapshotSchema(
+                    schemaManager, 
snapshotManager.snapshot(snapshotId).schemaId(), 1L, 4);
+        }
+        // check data
+        assertThat(batchSql("SELECT * FROM T3"))
+                .containsExactlyInAnyOrderElementsOf(batchSql("SELECT * FROM 
T4"));
+    }
+
+    private void waitForTheNextSnapshot(@Nullable Long initSnapshotId) throws 
InterruptedException {
+        Snapshot snapshot = findLatestSnapshot("T3", false);
+        while (snapshot == null || new 
Long(snapshot.id()).equals(initSnapshotId)) {
+            Thread.sleep(2000L);
+            snapshot = findLatestSnapshot("T3", false);
+        }
+    }
+
+    private JobID startJobAndAssertStatusTransition(
+            ClusterClient<?> client, String sql, @Nullable Long 
initSnapshotId) throws Exception {
+        JobID jobId = sEnv.executeSql(sql).getJobClient().get().getJobID();
+        // let job run until the first snapshot is finished
+        waitForTheNextSnapshot(initSnapshotId);
+        
assertThat(client.getJobStatus(jobId).get()).isEqualTo(JobStatus.RUNNING);
+        return jobId;
+    }
+
+    private void stopJobAndAssertStatusTransition(ClusterClient<?> client, 
JobID jobId)
+            throws ExecutionException, InterruptedException {
+        client.stopWithSavepoint(jobId, true, path, 
SavepointFormatType.DEFAULT);
+        while (client.getJobStatus(jobId).get() == JobStatus.RUNNING) {
+            Thread.sleep(2000L);
+        }
+        
assertThat(client.getJobStatus(jobId).get()).isEqualTo(JobStatus.FINISHED);
+    }
+
+    private void assertLatestSchema(
+            SchemaManager schemaManager, long expectedSchemaId, int 
expectedBucketNum) {
+        assertThat(schemaManager.latest()).isPresent();
+        Schema schema = schemaManager.latest().get();
+        assertThat(schema.id()).isEqualTo(expectedSchemaId);
+        assertThat(schema.options()).containsEntry(BUCKET.key(), 
String.valueOf(expectedBucketNum));
+    }
+
+    private void assertSnapshotSchema(
+            SchemaManager schemaManager,
+            long schemaIdFromSnapshot,
+            long expectedSchemaId,
+            int expectedBucketNum) {
+        assertThat(schemaIdFromSnapshot).isEqualTo(expectedSchemaId);
+        Schema schema = schemaManager.schema(schemaIdFromSnapshot);
+        assertThat(schema.options()).containsEntry(BUCKET.key(), 
String.valueOf(expectedBucketNum));
+    }
+
+    private void innerTest(String catalogName, String tableName, boolean 
managedTable) {
+        String useCatalogSql = "USE CATALOG %s";
+        batchSql(useCatalogSql, catalogName);
+        String insertSql = "INSERT INTO %s VALUES (1), (2), (3), (4), (5)";
+        batchSql(insertSql, tableName);
+        Snapshot snapshot = findLatestSnapshot(tableName, managedTable);
+        assertThat(snapshot).isNotNull();
+
+        SchemaManager schemaManager = new 
SchemaManager(getTableDirectory(tableName, managedTable));
+        assertSnapshotSchema(schemaManager, snapshot.schemaId(), 0L, 2);
+
+        // for managed table schema id remains unchanged, for catalog table id 
increase from 0 to 1
+        batchSql(alterTableSql, tableName, 4);
+        if (managedTable) {
+            // managed table cannot update schema
+            assertLatestSchema(schemaManager, 0L, 2);
+        } else {
+            assertLatestSchema(schemaManager, 1L, 4);
+        }
+
+        // check read is not influenced
+        List<Row> expected = Arrays.asList(Row.of(1), Row.of(2), Row.of(3), 
Row.of(4), Row.of(5));
+        assertThat(batchSql("SELECT * FROM %s", tableName))
+                .containsExactlyInAnyOrderElementsOf(expected);
+
+        // check write without rescale
+        assertThatThrownBy(() -> batchSql("INSERT INTO %s VALUES (6)", 
tableName))
+                .getRootCause()
+                .isInstanceOf(TableException.class)
+                .hasMessage(
+                        "Try to write table with a new bucket num 4, but the 
previous bucket num is 2. "
+                                + "Please switch to batch mode, and perform 
INSERT OVERWRITE to rescale current data layout first.");
+
+        batchSql(rescaleOverwriteSql, tableName, tableName);
+        snapshot = findLatestSnapshot(tableName, managedTable);
+        assertThat(snapshot).isNotNull();
+        assertThat(snapshot.id()).isEqualTo(2L);
+        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
+        assertSnapshotSchema(
+                schemaManager, snapshot.schemaId(), managedTable ? 0L : 1L, 
managedTable ? 2 : 4);
+        assertThat(batchSql("SELECT * FROM %s", tableName))
+                .containsExactlyInAnyOrderElementsOf(expected);
+
+        // insert new data
+        batchSql("INSERT INTO %s VALUES(6)", tableName);
+        expected = Arrays.asList(Row.of(1), Row.of(2), Row.of(3), Row.of(4), 
Row.of(5), Row.of(6));
+        assertThat(batchSql("SELECT * FROM %s", tableName))
+                .containsExactlyInAnyOrderElementsOf(expected);
+    }
+
+    private void executeBoth(List<String> sqlList) {
+        sqlList.forEach(
+                sql -> {
+                    sEnv.executeSql(sql);
+                    bEnv.executeSql(sql);
+                });
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index d4c0ac43..c9a8dec2 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -43,13 +43,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<RowData> {
 
     @Override
     public AppendOnlyFileStoreScan newScan() {
-        return new AppendOnlyFileStoreScan(
-                partitionType,
-                rowType,
-                snapshotManager(),
-                manifestFileFactory(),
-                manifestListFactory(),
-                options.bucket());
+        return newScan(false);
     }
 
     @Override
@@ -66,7 +60,18 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<RowData> {
                 options.fileFormat(),
                 pathFactory(),
                 snapshotManager(),
-                newScan(),
+                newScan(true),
                 options.mergeTreeOptions().targetFileSize);
     }
+
+    private AppendOnlyFileStoreScan newScan(boolean checkNumOfBuckets) {
+        return new AppendOnlyFileStoreScan(
+                partitionType,
+                rowType,
+                snapshotManager(),
+                manifestFileFactory(),
+                manifestListFactory(),
+                options.bucket(),
+                checkNumOfBuckets);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 287f25d4..ec6a6335 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -56,13 +56,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
 
     @Override
     public KeyValueFileStoreScan newScan() {
-        return new KeyValueFileStoreScan(
-                partitionType,
-                keyType,
-                snapshotManager(),
-                manifestFileFactory(),
-                manifestListFactory(),
-                options.bucket());
+        return newScan(false);
     }
 
     @Override
@@ -90,7 +84,18 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 options.fileFormat(),
                 pathFactory(),
                 snapshotManager(),
-                newScan(),
+                newScan(true),
                 options.mergeTreeOptions());
     }
+
+    private KeyValueFileStoreScan newScan(boolean checkNumOfBuckets) {
+        return new KeyValueFileStoreScan(
+                partitionType,
+                keyType,
+                snapshotManager(),
+                manifestFileFactory(),
+                manifestListFactory(),
+                options.bucket(),
+                checkNumOfBuckets);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 02d28007..1a82f296 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
@@ -28,6 +30,7 @@ import org.apache.flink.table.store.file.predicate.Literal;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -54,6 +57,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private final ManifestFile.Factory manifestFileFactory;
     private final ManifestList manifestList;
     private final int numOfBuckets;
+    private final boolean checkNumOfBuckets;
 
     private Predicate partitionFilter;
 
@@ -67,13 +71,15 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             SnapshotManager snapshotManager,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
-            int numOfBuckets) {
+            int numOfBuckets,
+            boolean checkNumOfBuckets) {
         this.partitionStatsConverter = new 
FieldStatsArraySerializer(partitionType);
         this.partitionConverter = new 
RowDataToObjectArrayConverter(partitionType);
         this.snapshotManager = snapshotManager;
         this.manifestFileFactory = manifestFileFactory;
         this.manifestList = manifestListFactory.create();
         this.numOfBuckets = numOfBuckets;
+        this.checkNumOfBuckets = checkNumOfBuckets;
     }
 
     @Override
@@ -181,9 +187,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
         for (ManifestEntry entry : entries) {
             ManifestEntry.Identifier identifier = entry.identifier();
-            Preconditions.checkState(
-                    entry.totalBuckets() == numOfBuckets,
-                    "Bucket number has been changed. Manifest might be 
corrupted.");
             switch (entry.kind()) {
                 case ADD:
                     Preconditions.checkState(
@@ -206,7 +209,35 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                             "Unknown value kind " + entry.kind().name());
             }
         }
-        List<ManifestEntry> files = new ArrayList<>(map.values());
+        List<ManifestEntry> files = new ArrayList<>();
+        for (ManifestEntry file : map.values()) {
+            if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
+                String partInfo =
+                        partitionConverter.getArity() > 0
+                                ? "partition "
+                                        + 
FileStorePathFactory.getPartitionComputer(
+                                                        
partitionConverter.rowType(),
+                                                        
FileSystemConnectorOptions
+                                                                
.PARTITION_DEFAULT_NAME
+                                                                
.defaultValue())
+                                                
.generatePartValues(file.partition())
+                                : "table";
+                throw new TableException(
+                        String.format(
+                                "Try to write %s with a new bucket num %d, but 
the previous bucket num is %d. "
+                                        + "Please switch to batch mode, and 
perform INSERT OVERWRITE to rescale current data layout first.",
+                                partInfo, numOfBuckets, file.totalBuckets()));
+            }
+
+            // bucket filter should not be applied along with partition filter
+            // because the specifiedBucket is computed against the current 
numOfBuckets
+            // however entry.bucket() was computed against the old numOfBuckets
+            // and thus the filtered manifest entries might be empty
+            // which renders the bucket check invalid
+            if (filterByBucket(file)) {
+                files.add(file);
+            }
+        }
 
         return new Plan() {
             @Nullable
@@ -230,18 +261,16 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     }
 
     private boolean filterManifestEntry(ManifestEntry entry) {
-        return filterByPartitionAndBucket(entry) && filterByStats(entry);
+        return filterByPartition(entry) && filterByStats(entry);
     }
 
-    private boolean filterByPartitionAndBucket(ManifestEntry entry) {
-        if (specifiedBucket != null) {
-            Preconditions.checkState(
-                    specifiedBucket < entry.totalBuckets(),
-                    "Bucket number has been changed. Manifest might be 
corrupted.");
-        }
+    private boolean filterByPartition(ManifestEntry entry) {
         return (partitionFilter == null
-                        || 
partitionFilter.test(partitionConverter.convert(entry.partition())))
-                && (specifiedBucket == null || entry.bucket() == 
specifiedBucket);
+                || 
partitionFilter.test(partitionConverter.convert(entry.partition())));
+    }
+
+    private boolean filterByBucket(ManifestEntry entry) {
+        return (specifiedBucket == null || entry.bucket() == specifiedBucket);
     }
 
     protected abstract boolean filterByStats(ManifestEntry entry);
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
index 521a91e4..24ca55d8 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
@@ -39,13 +39,15 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
             SnapshotManager snapshotManager,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
-            int numOfBuckets) {
+            int numOfBuckets,
+            boolean checkNumOfBuckets) {
         super(
                 partitionType,
                 snapshotManager,
                 manifestFileFactory,
                 manifestListFactory,
-                numOfBuckets);
+                numOfBuckets,
+                checkNumOfBuckets);
         this.rowStatsConverter = new FieldStatsArraySerializer(rowType);
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
index bdbf23fd..aa6bc701 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
@@ -39,13 +39,15 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
             SnapshotManager snapshotManager,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
-            int numOfBuckets) {
+            int numOfBuckets,
+            boolean checkNumOfBuckets) {
         super(
                 partitionType,
                 snapshotManager,
                 manifestFileFactory,
                 manifestListFactory,
-                numOfBuckets);
+                numOfBuckets,
+                checkNumOfBuckets);
         this.keyStatsConverter = new FieldStatsArraySerializer(keyType);
     }
 

Reply via email to