This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new adc70cef [FLINK-28035] Support rescale overwrite
adc70cef is described below
commit adc70cef2da6b1750fd19b5ef70e7e06e490a4fa
Author: Jane Chan <[email protected]>
AuthorDate: Tue Jun 21 11:04:40 2022 +0800
[FLINK-28035] Support rescale overwrite
This closes #157
---
.../store/connector/TableStoreFactoryOptions.java | 12 --
.../store/connector/AlterTableCompactITCase.java | 31 +--
.../store/connector/FileStoreTableITCase.java | 80 ++++++-
.../store/connector/ReadWriteTableITCase.java | 49 +++--
.../table/store/connector/RescaleBucketITCase.java | 240 +++++++++++++++++++++
.../table/store/file/AppendOnlyFileStore.java | 21 +-
.../flink/table/store/file/KeyValueFileStore.java | 21 +-
.../file/operation/AbstractFileStoreScan.java | 57 +++--
.../file/operation/AppendOnlyFileStoreScan.java | 6 +-
.../file/operation/KeyValueFileStoreScan.java | 6 +-
10 files changed, 426 insertions(+), 97 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
index 18d04a68..38d6a209 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
@@ -35,17 +35,6 @@ public class TableStoreFactoryOptions {
.noDefaultValue()
.withDescription("The root file path of the table store in
the filesystem.");
- public static final ConfigOption<Boolean> COMPACTION_RESCALE_BUCKET =
- ConfigOptions.key("compaction.rescale-bucket")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "Specify the behavior for compaction. Set value to
true "
- + "will lead compaction to reorganize data
files "
- + "according to the bucket number from
table schema. "
- + "By default, compaction does not adjust
the bucket number "
- + "of a partition/table.");
-
@Internal
public static final ConfigOption<Boolean> COMPACTION_MANUAL_TRIGGERED =
ConfigOptions.key("compaction.manual-triggered")
@@ -81,7 +70,6 @@ public class TableStoreFactoryOptions {
public static Set<ConfigOption<?>> allOptions() {
Set<ConfigOption<?>> allOptions = new HashSet<>();
- allOptions.add(COMPACTION_RESCALE_BUCKET);
allOptions.add(LOG_SYSTEM);
allOptions.add(SINK_PARALLELISM);
allOptions.add(SCAN_PARALLELISM);
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
index 0e4d8358..a3b9726b 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
@@ -18,15 +18,12 @@
package org.apache.flink.table.store.connector;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
@@ -41,7 +38,6 @@ import java.util.Random;
import java.util.stream.Collectors;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
-import static
org.apache.flink.table.store.file.FileStoreOptions.relativeTablePath;
import static
org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
import static
org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED;
import static
org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.SINGLE_PARTITIONED;
@@ -107,14 +103,14 @@ public class AlterTableCompactITCase extends
FileStoreTableITCase {
batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3,
3), " + "(4, 4, 4)");
batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3,
3), " + "(4, 4, 4)");
batchSql("INSERT INTO T0 VALUES(1, 1, 1)," + "(2, 2, 2), " + "(3, 3,
3), " + "(4, 4, 4)");
- Snapshot snapshot = findLatestSnapshot("T0");
+ Snapshot snapshot = findLatestSnapshot("T0", true);
assertThat(snapshot.id()).isEqualTo(6);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
// decrease trigger
batchSql("ALTER TABLE T0 SET ('num-sorted-run.compaction-trigger' =
'1')");
batchSql("ALTER TABLE T0 COMPACT");
- assertThat(findLatestSnapshot("T0"))
+ assertThat(findLatestSnapshot("T0", true))
.usingComparator(Comparator.comparing(Snapshot::id))
.isEqualTo(snapshot);
}
@@ -143,7 +139,7 @@ public class AlterTableCompactITCase extends
FileStoreTableITCase {
.map(kv -> kvAsString(kv, mode))
.collect(Collectors.joining(",\n")));
batchSql(insertQuery);
- Snapshot snapshot = findLatestSnapshot(tableName);
+ Snapshot snapshot = findLatestSnapshot(tableName, true);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
latestSnapshot = snapshot.id();
dataset.addAll(data);
@@ -185,7 +181,7 @@ public class AlterTableCompactITCase extends
FileStoreTableITCase {
.filter(kv -> partFilter(kv, part, mode))
.map(kv -> convertToRow(kv, mode))
.collect(Collectors.toList()));
- latestSnapshot = findLatestSnapshot(tableName).id();
+ latestSnapshot = findLatestSnapshot(tableName, true).id();
}
}
}
@@ -197,12 +193,12 @@ public class AlterTableCompactITCase extends
FileStoreTableITCase {
long latestSnapshot,
List<Row> expectedData) {
batchSql(compactQuery);
- Snapshot snapshot = findLatestSnapshot(tableName);
+ Snapshot snapshot = findLatestSnapshot(tableName, true);
assertThat(snapshot.id()).isEqualTo(latestSnapshot + 1);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
// check idempotence
batchSql(compactQuery);
-
assertThat(findLatestSnapshot(tableName).id()).isEqualTo(snapshot.id());
+ assertThat(findLatestSnapshot(tableName,
true).id()).isEqualTo(snapshot.id());
// read data
List<Row> readData = batchSql(selectQuery);
@@ -296,19 +292,4 @@ public class AlterTableCompactITCase extends
FileStoreTableITCase {
throw new UnsupportedOperationException("unsupported mode");
}
}
-
- private Path getTableDirectory(String tableName) {
- return new Path(
- path
- + relativeTablePath(
- ObjectIdentifier.of(
- bEnv.getCurrentCatalog(),
- bEnv.getCurrentDatabase(),
- tableName)));
- }
-
- private Snapshot findLatestSnapshot(String tableName) {
- SnapshotManager snapshotManager = new
SnapshotManager(getTableDirectory(tableName));
- return snapshotManager.snapshot(snapshotManager.latestSnapshotId());
- }
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
index 002480fd..4e6e8c59 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
@@ -19,10 +19,23 @@
package org.apache.flink.table.store.connector;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
@@ -31,6 +44,8 @@ import
org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.junit.Before;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.time.Duration;
import java.util.List;
@@ -38,6 +53,8 @@ import java.util.List;
import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.ROOT_PATH;
import static
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+import static
org.apache.flink.table.store.file.FileStoreOptions.relativeTablePath;
+import static org.junit.jupiter.api.Assertions.fail;
/** ITCase for file store table api. */
public abstract class FileStoreTableITCase extends AbstractTestBase {
@@ -52,15 +69,50 @@ public abstract class FileStoreTableITCase extends
AbstractTestBase {
sEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(100));
path = TEMPORARY_FOLDER.newFolder().toURI().toString();
- prepareEnv(bEnv, path);
- prepareEnv(sEnv, path);
+ prepareConfiguration(bEnv, path);
+ prepareConfiguration(sEnv, path);
+ prepareEnv();
}
- private void prepareEnv(TableEnvironment env, String path) {
+ private void prepareConfiguration(TableEnvironment env, String path) {
Configuration config = env.getConfig().getConfiguration();
config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
config.setString(TABLE_STORE_PREFIX + ROOT_PATH.key(), path);
- ddl().forEach(env::executeSql);
+ }
+
+ private void prepareEnv() {
+ Parser parser = ((TableEnvironmentImpl) sEnv).getParser();
+ for (String ddl : ddl()) {
+ sEnv.executeSql(ddl);
+ List<Operation> operations = parser.parse(ddl);
+ if (operations.size() == 1) {
+ Operation operation = operations.get(0);
+ if (operation instanceof CreateCatalogOperation) {
+ String name = ((CreateCatalogOperation)
operation).getCatalogName();
+ bEnv.registerCatalog(name,
sEnv.getCatalog(name).orElse(null));
+ } else if (operation instanceof CreateTableOperation) {
+ ObjectIdentifier tableIdentifier =
+ ((CreateTableOperation)
operation).getTableIdentifier();
+ try {
+ CatalogBaseTable table =
+
sEnv.getCatalog(tableIdentifier.getCatalogName())
+ .get()
+
.getTable(tableIdentifier.toObjectPath());
+ ((TableEnvironmentImpl) bEnv)
+ .getCatalogManager()
+ .getCatalog(tableIdentifier.getCatalogName())
+ .get()
+ .createTable(tableIdentifier.toObjectPath(),
table, true);
+ } catch (TableNotExistException
+ | TableAlreadyExistException
+ | DatabaseNotExistException e) {
+ fail("This should not happen");
+ }
+ } else {
+ bEnv.executeSql(ddl);
+ }
+ }
+ }
}
protected abstract List<String> ddl();
@@ -78,4 +130,24 @@ public abstract class FileStoreTableITCase extends
AbstractTestBase {
throw new RuntimeException("Failed to collect the table result.",
e);
}
}
+
+ protected Path getTableDirectory(String tableName, boolean managedTable) {
+ return new Path(
+ path
+ + (managedTable
+ ? relativeTablePath(
+ ObjectIdentifier.of(
+ bEnv.getCurrentCatalog(),
+ bEnv.getCurrentDatabase(),
+ tableName))
+ : String.format("%s.db/%s",
bEnv.getCurrentDatabase(), tableName)));
+ }
+
+ @Nullable
+ protected Snapshot findLatestSnapshot(String tableName, boolean
managedTable) {
+ SnapshotManager snapshotManager =
+ new SnapshotManager(getTableDirectory(tableName,
managedTable));
+ Long id = snapshotManager.latestSnapshotId();
+ return id == null ? null : snapshotManager.snapshot(id);
+ }
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 97da71f6..10adddfb 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
@@ -1419,36 +1420,40 @@ public class ReadWriteTableITCase extends
ReadWriteTableTestBase {
String.format(
"CREATE TABLE IF NOT EXISTS rates (\n"
+ "currency STRING,\n"
- + " rate BIGINT\n"
- + ") WITH (\n"
+ + " rate BIGINT,\n"
+ + " dt STRING\n"
+ + ") PARTITIONED BY (dt)\n"
+ + "WITH (\n"
+ " 'bucket' = '2',\n"
+ " 'root-path' = '%s'\n"
+ ")",
rootPath));
- tEnv.executeSql("INSERT INTO rates VALUES('US Dollar', 102)").await();
+ tEnv.executeSql("INSERT INTO rates VALUES('US Dollar', 102,
'2022-06-20')").await();
// increase bucket num from 2 to 3
- tEnv.executeSql("ALTER TABLE rates SET ('bucket' = '3')");
- assertThatThrownBy(
- () -> tEnv.executeSql("INSERT INTO rates VALUES('US
Dollar', 102)").await())
- .hasRootCauseInstanceOf(IllegalStateException.class)
- .hasRootCauseMessage(
- "Bucket number has been changed. Manifest might be
corrupted.");
- assertThatThrownBy(() -> tEnv.executeSql("SELECT * FROM
rates").await())
- .hasRootCauseInstanceOf(IllegalStateException.class)
- .hasRootCauseMessage(
- "Bucket number has been changed. Manifest might be
corrupted.");
+ assertChangeBucketWithoutRescale(3);
// decrease bucket num from 3 to 1
- // TODO this test cannot work until alter table callback is
implemented for managed table
- /*
- tEnv.executeSql("ALTER TABLE rates RESET ('bucket')");
- assertThatThrownBy(() -> tEnv.executeSql("SELECT * FROM
rates").await())
- .hasRootCauseInstanceOf(IllegalStateException.class)
- .hasRootCauseMessage(
- "Bucket number has been changed. Manifest might be
corrupted.");
-
- */
+ assertChangeBucketWithoutRescale(1);
+ }
+
+ private void assertChangeBucketWithoutRescale(int bucketNum) throws
Exception {
+ tEnv.executeSql(String.format("ALTER TABLE rates SET ('bucket' =
'%d')", bucketNum));
+ // read is ok
+ assertThat(BlockingIterator.of(tEnv.executeSql("SELECT * FROM
rates").collect()).collect())
+ .containsExactlyInAnyOrder(changelogRow("+I", "US Dollar",
102L, "2022-06-20"));
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ "INSERT INTO rates VALUES('US
Dollar', 102, '2022-06-20')")
+ .await())
+ .getRootCause()
+ .isInstanceOf(TableException.class)
+ .hasMessage(
+ String.format(
+ "Try to write partition {dt=2022-06-20} with a
new bucket num %d, but the previous bucket num is 2. "
+ + "Please switch to batch mode, and
perform INSERT OVERWRITE to rescale current data layout first.",
+ bucketNum));
}
@Test
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
new file mode 100644
index 00000000..ce3f1677
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT case for overwrite data layout after changing num of bucket. */
+public class RescaleBucketITCase extends FileStoreTableITCase {
+
+ private final String alterTableSql = "ALTER TABLE %s SET ('bucket' =
'%d')";
+
+ private final String rescaleOverwriteSql = "INSERT OVERWRITE %s SELECT *
FROM %s";
+
+ @Override
+ protected List<String> ddl() {
+ return Arrays.asList(
+ "CREATE TABLE IF NOT EXISTS
`default_catalog`.`default_database`.`T0` (f0 INT) WITH ('bucket' = '2')",
+ String.format(
+ "CREATE CATALOG `fs_catalog` WITH ('type' =
'table-store', 'warehouse' = '%s')",
+ path),
+ "CREATE TABLE IF NOT EXISTS `fs_catalog`.`default`.`T1` (f0
INT) WITH ('bucket' = '2')");
+ }
+
+ @Test
+ public void testRescaleManagedTable() {
+ innerTest("default_catalog", "T0", true);
+ }
+
+ @Test
+ public void testRescaleCatalogTable() {
+ innerTest("fs_catalog", "T1", false);
+ }
+
+ @Test
+ public void testSuspendAndRecoverAfterRescaleOverwrite() throws Exception {
+ sEnv.executeSql(
+ "CREATE TABLE IF NOT EXISTS
`default_catalog`.`default_database`.`S0` (f0 INT) WITH ('connector' =
'datagen')");
+ // register a companion table T4 for T3
+ executeBoth(
+ Arrays.asList(
+ "USE CATALOG fs_catalog",
+ "CREATE TABLE IF NOT EXISTS `T3` (f0 INT) WITH
('bucket' = '2')",
+ "CREATE TABLE IF NOT EXISTS `T4` (f0 INT)"));
+ SchemaManager schemaManager = new
SchemaManager(getTableDirectory("T3", false));
+ assertLatestSchema(schemaManager, 0L, 2);
+
+ String streamSql =
+ "EXECUTE STATEMENT SET BEGIN\n "
+ + "INSERT INTO `T3` SELECT * FROM
`default_catalog`.`default_database`.`S0`;\n "
+ + "INSERT INTO `T4` SELECT * FROM
`default_catalog`.`default_database`.`S0`;\n"
+ + "END";
+
+
sEnv.getConfig().getConfiguration().set(SavepointConfigOptions.SAVEPOINT_PATH,
path);
+ ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+
+ // step1: run streaming insert
+ JobID jobId = startJobAndAssertStatusTransition(client, streamSql,
null);
+
+ // step2: stop with savepoint
+ stopJobAndAssertStatusTransition(client, jobId);
+
+ final Snapshot snapshotBeforeRescale = findLatestSnapshot("T3", false);
+ assertThat(snapshotBeforeRescale).isNotNull();
+ assertSnapshotSchema(schemaManager, snapshotBeforeRescale.schemaId(),
0L, 2);
+ List<Row> committedData = batchSql("SELECT * FROM T3");
+
+ // step3: check bucket num
+ batchSql(alterTableSql, "T3", 4);
+ assertLatestSchema(schemaManager, 1L, 4);
+
+ // step4: rescale data layout according to the new bucket num
+ batchSql(rescaleOverwriteSql, "T3", "T3");
+ Snapshot snapshotAfterRescale = findLatestSnapshot("T3", false);
+ assertThat(snapshotAfterRescale).isNotNull();
+
assertThat(snapshotAfterRescale.id()).isEqualTo(snapshotBeforeRescale.id() + 1);
+
assertThat(snapshotAfterRescale.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
+ assertSnapshotSchema(schemaManager, snapshotAfterRescale.schemaId(),
1L, 4);
+ assertThat(batchSql("SELECT * FROM
T3")).containsExactlyInAnyOrderElementsOf(committedData);
+
+ // step5: resume streaming job
+ JobID resumedJobId =
+ startJobAndAssertStatusTransition(client, streamSql,
snapshotAfterRescale.id());
+ // stop job
+ stopJobAndAssertStatusTransition(client, resumedJobId);
+
+ // check snapshot and schema
+ Snapshot lastSnapshot = findLatestSnapshot("T3", false);
+ assertThat(lastSnapshot).isNotNull();
+ SnapshotManager snapshotManager = new
SnapshotManager(getTableDirectory("T3", false));
+ for (long snapshotId = lastSnapshot.id();
+ snapshotId > snapshotAfterRescale.id();
+ snapshotId--) {
+ assertSnapshotSchema(
+ schemaManager,
snapshotManager.snapshot(snapshotId).schemaId(), 1L, 4);
+ }
+ // check data
+ assertThat(batchSql("SELECT * FROM T3"))
+ .containsExactlyInAnyOrderElementsOf(batchSql("SELECT * FROM
T4"));
+ }
+
+ private void waitForTheNextSnapshot(@Nullable Long initSnapshotId) throws
InterruptedException {
+ Snapshot snapshot = findLatestSnapshot("T3", false);
+ while (snapshot == null || new
Long(snapshot.id()).equals(initSnapshotId)) {
+ Thread.sleep(2000L);
+ snapshot = findLatestSnapshot("T3", false);
+ }
+ }
+
+ private JobID startJobAndAssertStatusTransition(
+ ClusterClient<?> client, String sql, @Nullable Long
initSnapshotId) throws Exception {
+ JobID jobId = sEnv.executeSql(sql).getJobClient().get().getJobID();
+ // let job run until the first snapshot is finished
+ waitForTheNextSnapshot(initSnapshotId);
+
assertThat(client.getJobStatus(jobId).get()).isEqualTo(JobStatus.RUNNING);
+ return jobId;
+ }
+
+ private void stopJobAndAssertStatusTransition(ClusterClient<?> client,
JobID jobId)
+ throws ExecutionException, InterruptedException {
+ client.stopWithSavepoint(jobId, true, path,
SavepointFormatType.DEFAULT);
+ while (client.getJobStatus(jobId).get() == JobStatus.RUNNING) {
+ Thread.sleep(2000L);
+ }
+
assertThat(client.getJobStatus(jobId).get()).isEqualTo(JobStatus.FINISHED);
+ }
+
+ private void assertLatestSchema(
+ SchemaManager schemaManager, long expectedSchemaId, int
expectedBucketNum) {
+ assertThat(schemaManager.latest()).isPresent();
+ Schema schema = schemaManager.latest().get();
+ assertThat(schema.id()).isEqualTo(expectedSchemaId);
+ assertThat(schema.options()).containsEntry(BUCKET.key(),
String.valueOf(expectedBucketNum));
+ }
+
+ private void assertSnapshotSchema(
+ SchemaManager schemaManager,
+ long schemaIdFromSnapshot,
+ long expectedSchemaId,
+ int expectedBucketNum) {
+ assertThat(schemaIdFromSnapshot).isEqualTo(expectedSchemaId);
+ Schema schema = schemaManager.schema(schemaIdFromSnapshot);
+ assertThat(schema.options()).containsEntry(BUCKET.key(),
String.valueOf(expectedBucketNum));
+ }
+
+ private void innerTest(String catalogName, String tableName, boolean
managedTable) {
+ String useCatalogSql = "USE CATALOG %s";
+ batchSql(useCatalogSql, catalogName);
+ String insertSql = "INSERT INTO %s VALUES (1), (2), (3), (4), (5)";
+ batchSql(insertSql, tableName);
+ Snapshot snapshot = findLatestSnapshot(tableName, managedTable);
+ assertThat(snapshot).isNotNull();
+
+ SchemaManager schemaManager = new
SchemaManager(getTableDirectory(tableName, managedTable));
+ assertSnapshotSchema(schemaManager, snapshot.schemaId(), 0L, 2);
+
+ // for managed table schema id remains unchanged, for catalog table id
increase from 0 to 1
+ batchSql(alterTableSql, tableName, 4);
+ if (managedTable) {
+ // managed table cannot update schema
+ assertLatestSchema(schemaManager, 0L, 2);
+ } else {
+ assertLatestSchema(schemaManager, 1L, 4);
+ }
+
+ // check read is not influenced
+ List<Row> expected = Arrays.asList(Row.of(1), Row.of(2), Row.of(3),
Row.of(4), Row.of(5));
+ assertThat(batchSql("SELECT * FROM %s", tableName))
+ .containsExactlyInAnyOrderElementsOf(expected);
+
+ // check write without rescale
+ assertThatThrownBy(() -> batchSql("INSERT INTO %s VALUES (6)",
tableName))
+ .getRootCause()
+ .isInstanceOf(TableException.class)
+ .hasMessage(
+ "Try to write table with a new bucket num 4, but the
previous bucket num is 2. "
+ + "Please switch to batch mode, and perform
INSERT OVERWRITE to rescale current data layout first.");
+
+ batchSql(rescaleOverwriteSql, tableName, tableName);
+ snapshot = findLatestSnapshot(tableName, managedTable);
+ assertThat(snapshot).isNotNull();
+ assertThat(snapshot.id()).isEqualTo(2L);
+
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
+ assertSnapshotSchema(
+ schemaManager, snapshot.schemaId(), managedTable ? 0L : 1L,
managedTable ? 2 : 4);
+ assertThat(batchSql("SELECT * FROM %s", tableName))
+ .containsExactlyInAnyOrderElementsOf(expected);
+
+ // insert new data
+ batchSql("INSERT INTO %s VALUES(6)", tableName);
+ expected = Arrays.asList(Row.of(1), Row.of(2), Row.of(3), Row.of(4),
Row.of(5), Row.of(6));
+ assertThat(batchSql("SELECT * FROM %s", tableName))
+ .containsExactlyInAnyOrderElementsOf(expected);
+ }
+
+ private void executeBoth(List<String> sqlList) {
+ sqlList.forEach(
+ sql -> {
+ sEnv.executeSql(sql);
+ bEnv.executeSql(sql);
+ });
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index d4c0ac43..c9a8dec2 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -43,13 +43,7 @@ public class AppendOnlyFileStore extends
AbstractFileStore<RowData> {
@Override
public AppendOnlyFileStoreScan newScan() {
- return new AppendOnlyFileStoreScan(
- partitionType,
- rowType,
- snapshotManager(),
- manifestFileFactory(),
- manifestListFactory(),
- options.bucket());
+ return newScan(false);
}
@Override
@@ -66,7 +60,18 @@ public class AppendOnlyFileStore extends
AbstractFileStore<RowData> {
options.fileFormat(),
pathFactory(),
snapshotManager(),
- newScan(),
+ newScan(true),
options.mergeTreeOptions().targetFileSize);
}
+
+ private AppendOnlyFileStoreScan newScan(boolean checkNumOfBuckets) {
+ return new AppendOnlyFileStoreScan(
+ partitionType,
+ rowType,
+ snapshotManager(),
+ manifestFileFactory(),
+ manifestListFactory(),
+ options.bucket(),
+ checkNumOfBuckets);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 287f25d4..ec6a6335 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -56,13 +56,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
@Override
public KeyValueFileStoreScan newScan() {
- return new KeyValueFileStoreScan(
- partitionType,
- keyType,
- snapshotManager(),
- manifestFileFactory(),
- manifestListFactory(),
- options.bucket());
+ return newScan(false);
}
@Override
@@ -90,7 +84,18 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
options.fileFormat(),
pathFactory(),
snapshotManager(),
- newScan(),
+ newScan(true),
options.mergeTreeOptions());
}
+
+ private KeyValueFileStoreScan newScan(boolean checkNumOfBuckets) {
+ return new KeyValueFileStoreScan(
+ partitionType,
+ keyType,
+ snapshotManager(),
+ manifestFileFactory(),
+ manifestListFactory(),
+ options.bucket(),
+ checkNumOfBuckets);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 02d28007..1a82f296 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.store.file.operation;
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
@@ -28,6 +30,7 @@ import org.apache.flink.table.store.file.predicate.Literal;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -54,6 +57,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private final ManifestFile.Factory manifestFileFactory;
private final ManifestList manifestList;
private final int numOfBuckets;
+ private final boolean checkNumOfBuckets;
private Predicate partitionFilter;
@@ -67,13 +71,15 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
SnapshotManager snapshotManager,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
- int numOfBuckets) {
+ int numOfBuckets,
+ boolean checkNumOfBuckets) {
this.partitionStatsConverter = new
FieldStatsArraySerializer(partitionType);
this.partitionConverter = new
RowDataToObjectArrayConverter(partitionType);
this.snapshotManager = snapshotManager;
this.manifestFileFactory = manifestFileFactory;
this.manifestList = manifestListFactory.create();
this.numOfBuckets = numOfBuckets;
+ this.checkNumOfBuckets = checkNumOfBuckets;
}
@Override
@@ -181,9 +187,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
for (ManifestEntry entry : entries) {
ManifestEntry.Identifier identifier = entry.identifier();
- Preconditions.checkState(
- entry.totalBuckets() == numOfBuckets,
- "Bucket number has been changed. Manifest might be
corrupted.");
switch (entry.kind()) {
case ADD:
Preconditions.checkState(
@@ -206,7 +209,35 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
"Unknown value kind " + entry.kind().name());
}
}
- List<ManifestEntry> files = new ArrayList<>(map.values());
+ List<ManifestEntry> files = new ArrayList<>();
+ for (ManifestEntry file : map.values()) {
+ if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
+ String partInfo =
+ partitionConverter.getArity() > 0
+ ? "partition "
+ +
FileStorePathFactory.getPartitionComputer(
+
partitionConverter.rowType(),
+
FileSystemConnectorOptions
+
.PARTITION_DEFAULT_NAME
+
.defaultValue())
+
.generatePartValues(file.partition())
+ : "table";
+ throw new TableException(
+ String.format(
+ "Try to write %s with a new bucket num %d, but
the previous bucket num is %d. "
+ + "Please switch to batch mode, and
perform INSERT OVERWRITE to rescale current data layout first.",
+ partInfo, numOfBuckets, file.totalBuckets()));
+ }
+
+ // bucket filter should not be applied along with partition filter
+ // because the specifiedBucket is computed against the current
numOfBuckets
+ // however entry.bucket() was computed against the old numOfBuckets
+ // and thus the filtered manifest entries might be empty
+ // which renders the bucket check invalid
+ if (filterByBucket(file)) {
+ files.add(file);
+ }
+ }
return new Plan() {
@Nullable
@@ -230,18 +261,16 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
private boolean filterManifestEntry(ManifestEntry entry) {
- return filterByPartitionAndBucket(entry) && filterByStats(entry);
+ return filterByPartition(entry) && filterByStats(entry);
}
- private boolean filterByPartitionAndBucket(ManifestEntry entry) {
- if (specifiedBucket != null) {
- Preconditions.checkState(
- specifiedBucket < entry.totalBuckets(),
- "Bucket number has been changed. Manifest might be
corrupted.");
- }
+ private boolean filterByPartition(ManifestEntry entry) {
return (partitionFilter == null
- ||
partitionFilter.test(partitionConverter.convert(entry.partition())))
- && (specifiedBucket == null || entry.bucket() ==
specifiedBucket);
+ ||
partitionFilter.test(partitionConverter.convert(entry.partition())));
+ }
+
+ private boolean filterByBucket(ManifestEntry entry) {
+ return (specifiedBucket == null || entry.bucket() == specifiedBucket);
}
protected abstract boolean filterByStats(ManifestEntry entry);
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
index 521a91e4..24ca55d8 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
@@ -39,13 +39,15 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
SnapshotManager snapshotManager,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
- int numOfBuckets) {
+ int numOfBuckets,
+ boolean checkNumOfBuckets) {
super(
partitionType,
snapshotManager,
manifestFileFactory,
manifestListFactory,
- numOfBuckets);
+ numOfBuckets,
+ checkNumOfBuckets);
this.rowStatsConverter = new FieldStatsArraySerializer(rowType);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
index bdbf23fd..aa6bc701 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
@@ -39,13 +39,15 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
SnapshotManager snapshotManager,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
- int numOfBuckets) {
+ int numOfBuckets,
+ boolean checkNumOfBuckets) {
super(
partitionType,
snapshotManager,
manifestFileFactory,
manifestListFactory,
- numOfBuckets);
+ numOfBuckets,
+ checkNumOfBuckets);
this.keyStatsConverter = new FieldStatsArraySerializer(keyType);
}