This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new c155b9d7 [FLINK-31431] Support copying a FileStoreTable with latest 
schema
c155b9d7 is described below

commit c155b9d7c0a6ef40327116cac88659c6be975919
Author: tsreaper <[email protected]>
AuthorDate: Wed Mar 15 10:17:41 2023 +0800

    [FLINK-31431] Support copying a FileStoreTable with latest schema
    
    This closes #600.
---
 .../table/store/table/AbstractFileStoreTable.java  | 16 +++++++++
 .../flink/table/store/table/FileStoreTable.java    |  2 ++
 .../table/store/table/FileStoreTableTestBase.java  | 40 ++++++++++++++++++++++
 3 files changed, 58 insertions(+)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
index 31b249e6..d8d1ba8e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.table.store.table.source.snapshot.SnapshotSplitReaderImp
 
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.function.BiConsumer;
 
 import static org.apache.flink.table.store.CoreOptions.PATH;
@@ -127,6 +128,21 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
         return copy(newTableSchema);
     }
 
+    @Override
+    public FileStoreTable copyWithLatestSchema() {
+        Map<String, String> options = tableSchema.options();
+        SchemaManager schemaManager = new SchemaManager(fileIO(), location());
+        Optional<TableSchema> optionalLatestSchema = schemaManager.latest();
+        if (optionalLatestSchema.isPresent()) {
+            TableSchema newTableSchema = optionalLatestSchema.get();
+            newTableSchema = newTableSchema.copy(options);
+            SchemaValidation.validateTableSchema(newTableSchema);
+            return copy(newTableSchema);
+        } else {
+            return this;
+        }
+    }
+
     protected SchemaManager schemaManager() {
         return new SchemaManager(fileIO(), path);
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index eb590bfd..cd498db8 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -55,6 +55,8 @@ public interface FileStoreTable extends DataTable, 
SupportsPartition {
     @Override
     FileStoreTable copy(Map<String, String> dynamicOptions);
 
+    FileStoreTable copyWithLatestSchema();
+
     @Override
     TableWriteImpl<?> newWrite(String commitUser);
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index c54351e5..03b5ef04 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -24,11 +24,14 @@ import org.apache.flink.table.store.data.BinaryString;
 import org.apache.flink.table.store.data.GenericMap;
 import org.apache.flink.table.store.data.GenericRow;
 import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.data.JoinedRow;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import 
org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.ReaderSupplier;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.SchemaChange;
+import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.utils.TraceableFileIO;
 import org.apache.flink.table.store.fs.FileIOFinder;
@@ -366,6 +369,43 @@ public abstract class FileStoreTableTestBase {
         }
     }
 
+    @Test
+    public void testCopyWithLatestSchema() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(conf -> 
conf.set(SNAPSHOT_NUM_RETAINED_MAX, 100));
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(1, 20, 200L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
+        schemaManager.commitChanges(SchemaChange.addColumn("added", 
DataTypes.INT()));
+        table = table.copyWithLatestSchema();
+        assertThat(table.options().snapshotNumRetainMax()).isEqualTo(100);
+        write = table.newWrite(commitUser);
+
+        write.write(new JoinedRow(rowData(1, 30, 300L), GenericRow.of(3000)));
+        write.write(new JoinedRow(rowData(1, 40, 400L), GenericRow.of(4000)));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        List<Split> splits = table.newScan().plan().splits();
+        TableRead read = table.newRead();
+        Function<InternalRow, String> toString =
+                rowData ->
+                        BATCH_ROW_TO_STRING.apply(rowData)
+                                + "|"
+                                + (rowData.isNullAt(7) ? "null" : 
rowData.getInt(7));
+        assertThat(getResult(read, splits, binaryRow(1), 0, toString))
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                
"1|10|100|binary|varbinary|mapKey:mapVal|multiset|null",
+                                
"1|20|200|binary|varbinary|mapKey:mapVal|multiset|null",
+                                
"1|30|300|binary|varbinary|mapKey:mapVal|multiset|3000",
+                                
"1|40|400|binary|varbinary|mapKey:mapVal|multiset|4000"));
+    }
+
     protected List<String> getResult(
             TableRead read,
             List<Split> splits,

Reply via email to