This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new c155b9d7 [FLINK-31431] Support copying a FileStoreTable with latest
schema
c155b9d7 is described below
commit c155b9d7c0a6ef40327116cac88659c6be975919
Author: tsreaper <[email protected]>
AuthorDate: Wed Mar 15 10:17:41 2023 +0800
[FLINK-31431] Support copying a FileStoreTable with latest schema
This closes #600.
---
.../table/store/table/AbstractFileStoreTable.java | 16 +++++++++
.../flink/table/store/table/FileStoreTable.java | 2 ++
.../table/store/table/FileStoreTableTestBase.java | 40 ++++++++++++++++++++++
3 files changed, 58 insertions(+)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
index 31b249e6..d8d1ba8e 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
@@ -41,6 +41,7 @@ import
org.apache.flink.table.store.table.source.snapshot.SnapshotSplitReaderImp
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.function.BiConsumer;
import static org.apache.flink.table.store.CoreOptions.PATH;
@@ -127,6 +128,21 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
return copy(newTableSchema);
}
+ @Override
+ public FileStoreTable copyWithLatestSchema() {
+ Map<String, String> options = tableSchema.options();
+ SchemaManager schemaManager = new SchemaManager(fileIO(), location());
+ Optional<TableSchema> optionalLatestSchema = schemaManager.latest();
+ if (optionalLatestSchema.isPresent()) {
+ TableSchema newTableSchema = optionalLatestSchema.get();
+ newTableSchema = newTableSchema.copy(options);
+ SchemaValidation.validateTableSchema(newTableSchema);
+ return copy(newTableSchema);
+ } else {
+ return this;
+ }
+ }
+
protected SchemaManager schemaManager() {
return new SchemaManager(fileIO(), path);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index eb590bfd..cd498db8 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -55,6 +55,8 @@ public interface FileStoreTable extends DataTable,
SupportsPartition {
@Override
FileStoreTable copy(Map<String, String> dynamicOptions);
+ FileStoreTable copyWithLatestSchema();
+
@Override
TableWriteImpl<?> newWrite(String commitUser);
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index c54351e5..03b5ef04 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -24,11 +24,14 @@ import org.apache.flink.table.store.data.BinaryString;
import org.apache.flink.table.store.data.GenericMap;
import org.apache.flink.table.store.data.GenericRow;
import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.data.JoinedRow;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import
org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.ReaderSupplier;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.SchemaChange;
+import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.utils.TraceableFileIO;
import org.apache.flink.table.store.fs.FileIOFinder;
@@ -366,6 +369,43 @@ public abstract class FileStoreTableTestBase {
}
}
+ @Test
+ public void testCopyWithLatestSchema() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(conf ->
conf.set(SNAPSHOT_NUM_RETAINED_MAX, 100));
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(1, 20, 200L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
+ schemaManager.commitChanges(SchemaChange.addColumn("added",
DataTypes.INT()));
+ table = table.copyWithLatestSchema();
+ assertThat(table.options().snapshotNumRetainMax()).isEqualTo(100);
+ write = table.newWrite(commitUser);
+
+ write.write(new JoinedRow(rowData(1, 30, 300L), GenericRow.of(3000)));
+ write.write(new JoinedRow(rowData(1, 40, 400L), GenericRow.of(4000)));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ List<Split> splits = table.newScan().plan().splits();
+ TableRead read = table.newRead();
+ Function<InternalRow, String> toString =
+ rowData ->
+ BATCH_ROW_TO_STRING.apply(rowData)
+ + "|"
+ + (rowData.isNullAt(7) ? "null" :
rowData.getInt(7));
+ assertThat(getResult(read, splits, binaryRow(1), 0, toString))
+ .hasSameElementsAs(
+ Arrays.asList(
+
"1|10|100|binary|varbinary|mapKey:mapVal|multiset|null",
+
"1|20|200|binary|varbinary|mapKey:mapVal|multiset|null",
+
"1|30|300|binary|varbinary|mapKey:mapVal|multiset|3000",
+
"1|40|400|binary|varbinary|mapKey:mapVal|multiset|4000"));
+ }
+
protected List<String> getResult(
TableRead read,
List<Split> splits,