This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 00403245 [FLINK-27371] Record schema id in ManifestFileMeta and 
DataFileMeta
00403245 is described below

commit 0040324527da95884ed14297b263feccd0c74ea6
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 13 18:24:35 2022 +0800

    [FLINK-27371] Record schema id in ManifestFileMeta and DataFileMeta
    
    This closes #141
---
 .../table/store/connector/sink/TestFileStore.java  |  1 +
 .../source/FileStoreSourceSplitGeneratorTest.java  |  1 +
 .../source/FileStoreSourceSplitSerializerTest.java |  1 +
 .../source/TestChangelogDataReadWrite.java         |  9 +++++++-
 .../flink/table/store/file/FileStoreImpl.java      | 16 +++++++++++++++
 .../flink/table/store/file/data/DataFileMeta.java  | 18 ++++++++++++++--
 .../store/file/data/DataFileMetaSerializer.java    |  4 +++-
 .../table/store/file/data/DataFileReader.java      | 19 +++++++++++++++++
 .../table/store/file/data/DataFileWriter.java      |  8 ++++++++
 .../table/store/file/manifest/ManifestFile.java    | 18 +++++++++++++++-
 .../store/file/manifest/ManifestFileMeta.java      | 20 +++++++++++++-----
 .../file/manifest/ManifestFileMetaSerializer.java  | 17 +++++++--------
 .../store/file/operation/FileStoreReadImpl.java    |  6 +++++-
 .../store/file/operation/FileStoreWriteImpl.java   | 24 +++++++++++++++++++---
 .../table/store/file/schema/SchemaManager.java     | 16 +++++++--------
 .../table/store/file/writer/AppendOnlyWriter.java  |  7 +++++--
 .../store/table/AppendOnlyFileStoreTable.java      |  4 +++-
 .../table/ChangelogValueCountFileStoreTable.java   |  5 ++++-
 .../table/ChangelogWithKeyFileStoreTable.java      |  5 ++++-
 .../table/store/table/FileStoreTableFactory.java   |  9 ++++----
 .../flink/table/store/file/TestFileStore.java      |  2 ++
 .../flink/table/store/file/data/DataFileTest.java  |  4 ++++
 .../store/file/data/DataFileTestDataGenerator.java |  1 +
 .../ManifestCommittableSerializerTest.java         |  1 +
 .../store/file/manifest/ManifestFileMetaTest.java  |  7 ++++++-
 .../store/file/manifest/ManifestFileTest.java      |  9 +++++++-
 .../file/manifest/ManifestTestDataGenerator.java   |  3 ++-
 .../table/store/file/mergetree/LevelsTest.java     |  2 +-
 .../table/store/file/mergetree/MergeTreeTest.java  | 14 +++++++++++--
 .../file/mergetree/compact/CompactManagerTest.java |  1 +
 .../mergetree/compact/IntervalPartitionTest.java   |  1 +
 .../mergetree/compact/UniversalCompactionTest.java |  2 +-
 .../store/file/writer/AppendOnlyWriterTest.java    |  2 +-
 .../store/table/AppendOnlyFileStoreTableTest.java  | 18 ++++++++--------
 .../ChangelogValueCountFileStoreTableTest.java     | 19 +++++++++--------
 .../table/ChangelogWithKeyFileStoreTableTest.java  | 19 +++++++++--------
 .../table/store/format/FileFormatSuffixTest.java   |  2 +-
 37 files changed, 239 insertions(+), 76 deletions(-)

diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index 3b797e0e..639d5294 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -215,6 +215,7 @@ public class TestFileStore implements FileStore {
                                                     newEmptyTableStats(3),
                                                     0,
                                                     0,
+                                                    0,
                                                     0))
                             .collect(Collectors.toList());
             return new Increment(newFiles, Collections.emptyList(), 
Collections.emptyList());
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 254e6992..6af47d5d 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -129,6 +129,7 @@ public class FileStoreSourceSplitGeneratorTest {
                         StatsTestUtils.newEmptyTableStats(), // not used
                         0, // not used
                         0, // not used
+                        0, // not used
                         0 // not used
                         ));
     }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
index 45ec4dd0..d01be01c 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
@@ -82,6 +82,7 @@ public class FileStoreSourceSplitSerializerTest {
                 StatsTestUtils.newEmptyTableStats(),
                 0,
                 1,
+                0,
                 level);
     }
 
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 2d151c3c..4aa5f332 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunct
 import org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
 import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
+import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -66,6 +67,7 @@ public class TestChangelogDataReadWrite {
             Comparator.comparingLong(o -> o.getLong(0));
 
     private final FileFormat avro;
+    private final Path tablePath;
     private final FileStorePathFactory pathFactory;
     private final SnapshotManager snapshotManager;
     private final ExecutorService service;
@@ -76,9 +78,10 @@ public class TestChangelogDataReadWrite {
                         Thread.currentThread().getContextClassLoader(),
                         "avro",
                         new Configuration());
+        this.tablePath = new Path(root);
         this.pathFactory =
                 new FileStorePathFactory(
-                        new Path(root),
+                        tablePath,
                         RowType.of(new IntType()),
                         "default",
                         FileStoreOptions.FILE_FORMAT.defaultValue());
@@ -99,6 +102,8 @@ public class TestChangelogDataReadWrite {
                     rowDataIteratorCreator) {
         FileStoreRead read =
                 new FileStoreReadImpl(
+                        new SchemaManager(tablePath),
+                        0,
                         WriteMode.CHANGE_LOG,
                         KEY_TYPE,
                         VALUE_TYPE,
@@ -143,6 +148,8 @@ public class TestChangelogDataReadWrite {
         MergeTreeOptions options = new MergeTreeOptions(new Configuration());
         return new FileStoreWriteImpl(
                         WriteMode.CHANGE_LOG,
+                        new SchemaManager(tablePath),
+                        0,
                         KEY_TYPE,
                         VALUE_TYPE,
                         () -> COMPARATOR,
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index a0b2d243..f391f3b4 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
 import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
 import org.apache.flink.table.store.file.operation.FileStoreScanImpl;
 import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
+import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.KeyComparatorSupplier;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -48,6 +49,7 @@ import java.util.stream.Collectors;
 /** File store implementation. */
 public class FileStoreImpl implements FileStore {
 
+    private final SchemaManager schemaManager;
     private final long schemaId;
     private final WriteMode writeMode;
     private final FileStoreOptions options;
@@ -59,6 +61,7 @@ public class FileStoreImpl implements FileStore {
     @Nullable private final MergeFunction mergeFunction;
 
     public FileStoreImpl(
+            SchemaManager schemaManager,
             long schemaId,
             FileStoreOptions options,
             WriteMode writeMode,
@@ -67,6 +70,7 @@ public class FileStoreImpl implements FileStore {
             RowType keyType,
             RowType valueType,
             @Nullable MergeFunction mergeFunction) {
+        this.schemaManager = schemaManager;
         this.schemaId = schemaId;
         this.options = options;
         this.writeMode = writeMode;
@@ -93,6 +97,8 @@ public class FileStoreImpl implements FileStore {
     @VisibleForTesting
     public ManifestFile.Factory manifestFileFactory() {
         return new ManifestFile.Factory(
+                schemaManager,
+                schemaId,
                 partitionType,
                 options.manifestFormat(),
                 pathFactory(),
@@ -108,6 +114,8 @@ public class FileStoreImpl implements FileStore {
     public FileStoreWriteImpl newWrite() {
         return new FileStoreWriteImpl(
                 writeMode,
+                schemaManager,
+                schemaId,
                 keyType,
                 valueType,
                 keyComparatorSupplier,
@@ -122,6 +130,8 @@ public class FileStoreImpl implements FileStore {
     @Override
     public FileStoreReadImpl newRead() {
         return new FileStoreReadImpl(
+                schemaManager,
+                schemaId,
                 writeMode,
                 keyType,
                 valueType,
@@ -192,12 +202,14 @@ public class FileStoreImpl implements FileStore {
     }
 
     public static FileStoreImpl createWithAppendOnly(
+            SchemaManager schemaManager,
             long schemaId,
             FileStoreOptions options,
             String user,
             RowType partitionType,
             RowType rowType) {
         return new FileStoreImpl(
+                schemaManager,
                 schemaId,
                 options,
                 WriteMode.APPEND_ONLY,
@@ -209,6 +221,7 @@ public class FileStoreImpl implements FileStore {
     }
 
     public static FileStoreImpl createWithPrimaryKey(
+            SchemaManager schemaManager,
             long schemaId,
             FileStoreOptions options,
             String user,
@@ -246,6 +259,7 @@ public class FileStoreImpl implements FileStore {
         }
 
         return new FileStoreImpl(
+                schemaManager,
                 schemaId,
                 options,
                 WriteMode.CHANGE_LOG,
@@ -257,6 +271,7 @@ public class FileStoreImpl implements FileStore {
     }
 
     public static FileStoreImpl createWithValueCount(
+            SchemaManager schemaManager,
             long schemaId,
             FileStoreOptions options,
             String user,
@@ -267,6 +282,7 @@ public class FileStoreImpl implements FileStore {
                         new LogicalType[] {new BigIntType(false)}, new 
String[] {"_VALUE_COUNT"});
         MergeFunction mergeFunction = new ValueCountMergeFunction();
         return new FileStoreImpl(
+                schemaManager,
                 schemaId,
                 options,
                 WriteMode.CHANGE_LOG,
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
index aee265a9..e85e3f5e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
@@ -56,6 +56,7 @@ public class DataFileMeta {
 
     private final long minSequenceNumber;
     private final long maxSequenceNumber;
+    private final long schemaId;
     private final int level;
 
     public static DataFileMeta forAppend(
@@ -64,7 +65,8 @@ public class DataFileMeta {
             long rowCount,
             BinaryTableStats rowStats,
             long minSequenceNumber,
-            long maxSequenceNumber) {
+            long maxSequenceNumber,
+            long schemaId) {
         return new DataFileMeta(
                 fileName,
                 fileSize,
@@ -75,6 +77,7 @@ public class DataFileMeta {
                 rowStats,
                 minSequenceNumber,
                 maxSequenceNumber,
+                schemaId,
                 DUMMY_LEVEL);
     }
 
@@ -88,6 +91,7 @@ public class DataFileMeta {
             BinaryTableStats valueStats,
             long minSequenceNumber,
             long maxSequenceNumber,
+            long schemaId,
             int level) {
         this.fileName = fileName;
         this.fileSize = fileSize;
@@ -101,6 +105,7 @@ public class DataFileMeta {
         this.minSequenceNumber = minSequenceNumber;
         this.maxSequenceNumber = maxSequenceNumber;
         this.level = level;
+        this.schemaId = schemaId;
     }
 
     public String fileName() {
@@ -139,6 +144,10 @@ public class DataFileMeta {
         return maxSequenceNumber;
     }
 
+    public long schemaId() {
+        return schemaId;
+    }
+
     public int level() {
         return level;
     }
@@ -155,6 +164,7 @@ public class DataFileMeta {
                 valueStats,
                 minSequenceNumber,
                 maxSequenceNumber,
+                schemaId,
                 newLevel);
     }
 
@@ -173,6 +183,7 @@ public class DataFileMeta {
                 && Objects.equals(valueStats, that.valueStats)
                 && minSequenceNumber == that.minSequenceNumber
                 && maxSequenceNumber == that.maxSequenceNumber
+                && schemaId == that.schemaId
                 && level == that.level;
     }
 
@@ -188,13 +199,14 @@ public class DataFileMeta {
                 valueStats,
                 minSequenceNumber,
                 maxSequenceNumber,
+                schemaId,
                 level);
     }
 
     @Override
     public String toString() {
         return String.format(
-                "{%s, %d, %d, %s, %s, %s, %s, %d, %d, %d}",
+                "{%s, %d, %d, %s, %s, %s, %s, %d, %d, %d, %d}",
                 fileName,
                 fileSize,
                 rowCount,
@@ -204,6 +216,7 @@ public class DataFileMeta {
                 valueStats,
                 minSequenceNumber,
                 maxSequenceNumber,
+                schemaId,
                 level);
     }
 
@@ -218,6 +231,7 @@ public class DataFileMeta {
         fields.add(new RowType.RowField("_VALUE_STATS", 
FieldStatsArraySerializer.schema()));
         fields.add(new RowType.RowField("_MIN_SEQUENCE_NUMBER", new 
BigIntType(false)));
         fields.add(new RowType.RowField("_MAX_SEQUENCE_NUMBER", new 
BigIntType(false)));
+        fields.add(new RowType.RowField("_SCHEMA_ID", new BigIntType(false)));
         fields.add(new RowType.RowField("_LEVEL", new IntType(false)));
         return new RowType(fields);
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java
index ef1db428..6050a8fb 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java
@@ -48,6 +48,7 @@ public class DataFileMetaSerializer extends 
ObjectSerializer<DataFileMeta> {
                 meta.valueStats().toRowData(),
                 meta.minSequenceNumber(),
                 meta.maxSequenceNumber(),
+                meta.schemaId(),
                 meta.level());
     }
 
@@ -63,6 +64,7 @@ public class DataFileMetaSerializer extends 
ObjectSerializer<DataFileMeta> {
                 BinaryTableStats.fromRowData(row.getRow(6, 3)),
                 row.getLong(7),
                 row.getLong(8),
-                row.getInt(9));
+                row.getLong(9),
+                row.getInt(10));
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
index fccd7a6e..aecf25ae 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializer;
 import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.RecordReader;
@@ -45,16 +46,24 @@ import java.io.IOException;
  */
 public class DataFileReader {
 
+    private final SchemaManager schemaManager;
+    private final long schemaId;
     private final RowType keyType;
     private final RowType valueType;
+
+    // TODO introduce Map<SchemaId, readerFactory>
     private final BulkFormat<RowData, FileSourceSplit> readerFactory;
     private final DataFilePathFactory pathFactory;
 
     private DataFileReader(
+            SchemaManager schemaManager,
+            long schemaId,
             RowType keyType,
             RowType valueType,
             BulkFormat<RowData, FileSourceSplit> readerFactory,
             DataFilePathFactory pathFactory) {
+        this.schemaManager = schemaManager;
+        this.schemaId = schemaId;
         this.keyType = keyType;
         this.valueType = valueType;
         this.readerFactory = readerFactory;
@@ -104,6 +113,8 @@ public class DataFileReader {
         @Override
         public KeyValue next() throws IOException {
             RecordAndPosition<RowData> result = iterator.next();
+
+            // TODO schema evolution
             return result == null ? null : 
serializer.fromRow(result.getRecord());
         }
 
@@ -116,6 +127,8 @@ public class DataFileReader {
     /** Creates {@link DataFileReader}. */
     public static class Factory {
 
+        private final SchemaManager schemaManager;
+        private final long schemaId;
         private final RowType keyType;
         private final RowType valueType;
         private final FileFormat fileFormat;
@@ -127,10 +140,14 @@ public class DataFileReader {
         private RowType projectedValueType;
 
         public Factory(
+                SchemaManager schemaManager,
+                long schemaId,
                 RowType keyType,
                 RowType valueType,
                 FileFormat fileFormat,
                 FileStorePathFactory pathFactory) {
+            this.schemaManager = schemaManager;
+            this.schemaId = schemaId;
             this.keyType = keyType;
             this.valueType = valueType;
             this.fileFormat = fileFormat;
@@ -158,6 +175,8 @@ public class DataFileReader {
             int[][] projection =
                     KeyValue.project(keyProjection, valueProjection, 
keyType.getFieldCount());
             return new DataFileReader(
+                    schemaManager,
+                    schemaId,
                     projectedKeyType,
                     projectedValueType,
                     fileFormat.createReaderFactory(recordType, projection),
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
index 096daad7..91ac96d9 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
@@ -57,6 +57,7 @@ public class DataFileWriter {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DataFileWriter.class);
 
+    private final long schemaId;
     private final RowType keyType;
     private final RowType valueType;
     private final BulkWriter.Factory<RowData> writerFactory;
@@ -67,12 +68,14 @@ public class DataFileWriter {
     private final long suggestedFileSize;
 
     private DataFileWriter(
+            long schemaId,
             RowType keyType,
             RowType valueType,
             BulkWriter.Factory<RowData> writerFactory,
             @Nullable FileStatsExtractor fileStatsExtractor,
             DataFilePathFactory pathFactory,
             long suggestedFileSize) {
+        this.schemaId = schemaId;
         this.keyType = keyType;
         this.valueType = valueType;
         this.writerFactory = writerFactory;
@@ -199,6 +202,7 @@ public class DataFileWriter {
                     valueStats,
                     minSeqNumber,
                     maxSeqNumber,
+                    schemaId,
                     level);
         }
     }
@@ -235,6 +239,7 @@ public class DataFileWriter {
     /** Creates {@link DataFileWriter}. */
     public static class Factory {
 
+        private final long schemaId;
         private final RowType keyType;
         private final RowType valueType;
         private final FileFormat fileFormat;
@@ -242,11 +247,13 @@ public class DataFileWriter {
         private final long suggestedFileSize;
 
         public Factory(
+                long schemaId,
                 RowType keyType,
                 RowType valueType,
                 FileFormat fileFormat,
                 FileStorePathFactory pathFactory,
                 long suggestedFileSize) {
+            this.schemaId = schemaId;
             this.keyType = keyType;
             this.valueType = valueType;
             this.fileFormat = fileFormat;
@@ -257,6 +264,7 @@ public class DataFileWriter {
         public DataFileWriter create(BinaryRowData partition, int bucket) {
             RowType recordType = KeyValue.schema(keyType, valueType);
             return new DataFileWriter(
+                    schemaId,
                     keyType,
                     valueType,
                     fileFormat.createWriterFactory(recordType),
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 9aa30c3c..8b9d43d5 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -25,6 +25,7 @@ import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.stats.FieldStatsCollector;
 import org.apache.flink.table.store.file.stats.FileStatsExtractor;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -53,6 +54,8 @@ public class ManifestFile {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ManifestFile.class);
 
+    private final SchemaManager schemaManager;
+    private final long schemaId;
     private final RowType partitionType;
     private final ManifestEntrySerializer serializer;
     private final BulkFormat<RowData, FileSourceSplit> readerFactory;
@@ -61,6 +64,8 @@ public class ManifestFile {
     private final FileWriter.Factory<ManifestEntry, Metric> fileWriterFactory;
 
     private ManifestFile(
+            SchemaManager schemaManager,
+            long schemaId,
             RowType partitionType,
             RowType entryType,
             ManifestEntrySerializer serializer,
@@ -69,6 +74,8 @@ public class ManifestFile {
             FileStatsExtractor fileStatsExtractor,
             FileStorePathFactory pathFactory,
             long suggestedFileSize) {
+        this.schemaManager = schemaManager;
+        this.schemaId = schemaId;
         this.partitionType = partitionType;
         this.serializer = serializer;
         this.readerFactory = readerFactory;
@@ -160,7 +167,8 @@ public class ManifestFile {
                     path.getFileSystem().getFileStatus(path).getLen(),
                     numAddedFiles,
                     numDeletedFiles,
-                    partitionStatsCollector.extract());
+                    partitionStatsCollector.extract(),
+                    schemaId);
         }
     }
 
@@ -193,16 +201,22 @@ public class ManifestFile {
      */
     public static class Factory {
 
+        private final SchemaManager schemaManager;
+        private final long schemaId;
         private final RowType partitionType;
         private final FileFormat fileFormat;
         private final FileStorePathFactory pathFactory;
         private final long suggestedFileSize;
 
         public Factory(
+                SchemaManager schemaManager,
+                long schemaId,
                 RowType partitionType,
                 FileFormat fileFormat,
                 FileStorePathFactory pathFactory,
                 long suggestedFileSize) {
+            this.schemaManager = schemaManager;
+            this.schemaId = schemaId;
             this.partitionType = partitionType;
             this.fileFormat = fileFormat;
             this.pathFactory = pathFactory;
@@ -212,6 +226,8 @@ public class ManifestFile {
         public ManifestFile create() {
             RowType entryType = 
VersionedObjectSerializer.versionType(ManifestEntry.schema());
             return new ManifestFile(
+                    schemaManager,
+                    schemaId,
                     partitionType,
                     entryType,
                     new ManifestEntrySerializer(),
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
index 80d2bbd6..e80da5f7 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
@@ -39,18 +39,21 @@ public class ManifestFileMeta {
     private final long numAddedFiles;
     private final long numDeletedFiles;
     private final BinaryTableStats partitionStats;
+    private final long schemaId;
 
     public ManifestFileMeta(
             String fileName,
             long fileSize,
             long numAddedFiles,
             long numDeletedFiles,
-            BinaryTableStats partitionStats) {
+            BinaryTableStats partitionStats,
+            long schemaId) {
         this.fileName = fileName;
         this.fileSize = fileSize;
         this.numAddedFiles = numAddedFiles;
         this.numDeletedFiles = numDeletedFiles;
         this.partitionStats = partitionStats;
+        this.schemaId = schemaId;
     }
 
     public String fileName() {
@@ -73,6 +76,10 @@ public class ManifestFileMeta {
         return partitionStats;
     }
 
+    public long schemaId() {
+        return schemaId;
+    }
+
     public static RowType schema() {
         List<RowType.RowField> fields = new ArrayList<>();
         fields.add(new RowType.RowField("_FILE_NAME", new VarCharType(false, 
Integer.MAX_VALUE)));
@@ -80,6 +87,7 @@ public class ManifestFileMeta {
         fields.add(new RowType.RowField("_NUM_ADDED_FILES", new 
BigIntType(false)));
         fields.add(new RowType.RowField("_NUM_DELETED_FILES", new 
BigIntType(false)));
         fields.add(new RowType.RowField("_PARTITION_STATS", 
FieldStatsArraySerializer.schema()));
+        fields.add(new RowType.RowField("_SCHEMA_ID", new BigIntType(false)));
         return new RowType(fields);
     }
 
@@ -93,19 +101,21 @@ public class ManifestFileMeta {
                 && fileSize == that.fileSize
                 && numAddedFiles == that.numAddedFiles
                 && numDeletedFiles == that.numDeletedFiles
-                && Objects.equals(partitionStats, that.partitionStats);
+                && Objects.equals(partitionStats, that.partitionStats)
+                && schemaId == that.schemaId;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(fileName, fileSize, numAddedFiles, 
numDeletedFiles, partitionStats);
+        return Objects.hash(
+                fileName, fileSize, numAddedFiles, numDeletedFiles, 
partitionStats, schemaId);
     }
 
     @Override
     public String toString() {
         return String.format(
-                "{%s, %d, %d, %d, %s}",
-                fileName, fileSize, numAddedFiles, numDeletedFiles, 
partitionStats);
+                "{%s, %d, %d, %d, %s, %d}",
+                fileName, fileSize, numAddedFiles, numDeletedFiles, 
partitionStats, schemaId);
     }
 
     /**
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaSerializer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaSerializer.java
index ee306daa..37ee023b 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaSerializer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaSerializer.java
@@ -40,13 +40,13 @@ public class ManifestFileMetaSerializer extends 
VersionedObjectSerializer<Manife
 
     @Override
     public RowData convertTo(ManifestFileMeta meta) {
-        GenericRowData row = new GenericRowData(5);
-        row.setField(0, StringData.fromString(meta.fileName()));
-        row.setField(1, meta.fileSize());
-        row.setField(2, meta.numAddedFiles());
-        row.setField(3, meta.numDeletedFiles());
-        row.setField(4, meta.partitionStats().toRowData());
-        return row;
+        return GenericRowData.of(
+                StringData.fromString(meta.fileName()),
+                meta.fileSize(),
+                meta.numAddedFiles(),
+                meta.numDeletedFiles(),
+                meta.partitionStats().toRowData(),
+                meta.schemaId());
     }
 
     @Override
@@ -65,6 +65,7 @@ public class ManifestFileMetaSerializer extends 
VersionedObjectSerializer<Manife
                 row.getLong(1),
                 row.getLong(2),
                 row.getLong(3),
-                BinaryTableStats.fromRowData(row.getRow(4, 3)));
+                BinaryTableStats.fromRowData(row.getRow(4, 3)),
+                row.getLong(5));
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
index 995ba622..5d6ef5ca 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.store.file.mergetree.MergeTreeReader;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.types.logical.RowType;
@@ -53,6 +54,8 @@ public class FileStoreReadImpl implements FileStoreRead {
     private boolean dropDelete = true;
 
     public FileStoreReadImpl(
+            SchemaManager schemaManager,
+            long schemaId,
             WriteMode writeMode,
             RowType keyType,
             RowType valueType,
@@ -61,7 +64,8 @@ public class FileStoreReadImpl implements FileStoreRead {
             FileFormat fileFormat,
             FileStorePathFactory pathFactory) {
         this.dataFileReaderFactory =
-                new DataFileReader.Factory(keyType, valueType, fileFormat, 
pathFactory);
+                new DataFileReader.Factory(
+                        schemaManager, schemaId, keyType, valueType, 
fileFormat, pathFactory);
         this.writeMode = writeMode;
         this.keyComparator = keyComparator;
         this.mergeFunction = mergeFunction;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
index fba8c330..b28653f1 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
 import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
+import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -58,6 +59,8 @@ import java.util.function.Supplier;
 public class FileStoreWriteImpl implements FileStoreWrite {
 
     private final WriteMode writeMode;
+    private final SchemaManager schemaManager;
+    private final long schemaId;
     private final RowType valueType;
     private final DataFileReader.Factory dataFileReaderFactory;
     private final DataFileWriter.Factory dataFileWriterFactory;
@@ -71,6 +74,8 @@ public class FileStoreWriteImpl implements FileStoreWrite {
 
     public FileStoreWriteImpl(
             WriteMode writeMode,
+            SchemaManager schemaManager,
+            long schemaId,
             RowType keyType,
             RowType valueType,
             Supplier<Comparator<RowData>> keyComparatorSupplier,
@@ -80,12 +85,20 @@ public class FileStoreWriteImpl implements FileStoreWrite {
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             MergeTreeOptions options) {
+        this.schemaManager = schemaManager;
+        this.schemaId = schemaId;
         this.valueType = valueType;
         this.dataFileReaderFactory =
-                new DataFileReader.Factory(keyType, valueType, fileFormat, 
pathFactory);
+                new DataFileReader.Factory(
+                        schemaManager, schemaId, keyType, valueType, 
fileFormat, pathFactory);
         this.dataFileWriterFactory =
                 new DataFileWriter.Factory(
-                        keyType, valueType, fileFormat, pathFactory, 
options.targetFileSize);
+                        schemaId,
+                        keyType,
+                        valueType,
+                        fileFormat,
+                        pathFactory,
+                        options.targetFileSize);
         this.writeMode = writeMode;
         this.fileFormat = fileFormat;
         this.keyComparatorSupplier = keyComparatorSupplier;
@@ -121,7 +134,12 @@ public class FileStoreWriteImpl implements FileStoreWrite {
                                 .orElse(-1L);
 
                 return new AppendOnlyWriter(
-                        fileFormat, options.targetFileSize, valueType, 
maxSeqNum, factory);
+                        schemaId,
+                        fileFormat,
+                        options.targetFileSize,
+                        valueType,
+                        maxSeqNum,
+                        factory);
 
             case CHANGE_LOG:
                 if (latestSnapshotId == null) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index ac835fc1..2613c4a4 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.io.UncheckedIOException;
 import java.util.List;
 import java.util.Map;
@@ -37,24 +38,16 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles;
 
 /** Schema Manager to manage schema versions. */
-public class SchemaManager {
+public class SchemaManager implements Serializable {
 
     private static final String SCHEMA_PREFIX = "schema-";
 
     private final Path tableRoot;
 
-    /** Default no lock. */
-    private Lock lock = Callable::call;
-
     public SchemaManager(Path tableRoot) {
         this.tableRoot = tableRoot;
     }
 
-    public SchemaManager withLock(Lock lock) {
-        this.lock = lock;
-        return this;
-    }
-
     /** @return latest schema. */
     public Optional<Schema> latest() {
         try {
@@ -83,6 +76,11 @@ public class SchemaManager {
 
     /** Create a new schema from {@link UpdateSchema}. */
     public Schema commitNewVersion(UpdateSchema updateSchema) throws Exception 
{
+        return commitNewVersion(Callable::call, updateSchema);
+    }
+
+    /** Create a new schema from {@link UpdateSchema}. */
+    public Schema commitNewVersion(Lock lock, UpdateSchema updateSchema) 
throws Exception {
         RowType rowType = updateSchema.rowType();
         List<String> partitionKeys = updateSchema.partitionKeys();
         List<String> primaryKeys = updateSchema.primaryKeys();
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java
index d5343319..a3842c77 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java
@@ -43,6 +43,7 @@ import java.util.function.Supplier;
  * operations and don't have any unique keys or sort keys.
  */
 public class AppendOnlyWriter implements RecordWriter {
+    private final long schemaId;
     private final long targetFileSize;
     private final DataFilePathFactory pathFactory;
     private final FieldStatsArraySerializer statsArraySerializer;
@@ -53,12 +54,13 @@ public class AppendOnlyWriter implements RecordWriter {
     private RowRollingWriter writer;
 
     public AppendOnlyWriter(
+            long schemaId,
             FileFormat fileFormat,
             long targetFileSize,
             RowType writeSchema,
             long maxWroteSeqNumber,
             DataFilePathFactory pathFactory) {
-
+        this.schemaId = schemaId;
         this.targetFileSize = targetFileSize;
         this.pathFactory = pathFactory;
         this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema);
@@ -158,7 +160,8 @@ public class AppendOnlyWriter implements RecordWriter {
                     recordCount(),
                     stats,
                     minSeqNum,
-                    Math.max(minSeqNum, nextSeqNum - 1));
+                    Math.max(minSeqNum, nextSeqNum - 1),
+                    schemaId);
         }
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 26472592..ff0c8539 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.store.table.sink.SinkRecord;
@@ -46,10 +47,11 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
 
     private final FileStoreImpl store;
 
-    AppendOnlyFileStoreTable(String name, Schema schema, String user) {
+    AppendOnlyFileStoreTable(String name, SchemaManager schemaManager, Schema 
schema, String user) {
         super(name, schema);
         this.store =
                 new FileStoreImpl(
+                        schemaManager,
                         schema.id(),
                         new FileStoreOptions(schema.options()),
                         WriteMode.APPEND_ONLY,
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index caa98e3b..79785d6e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import 
org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.store.table.sink.SinkRecord;
@@ -48,7 +49,8 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
 
     private final FileStoreImpl store;
 
-    ChangelogValueCountFileStoreTable(String name, Schema schema, String user) 
{
+    ChangelogValueCountFileStoreTable(
+            String name, SchemaManager schemaManager, Schema schema, String 
user) {
         super(name, schema);
         RowType countType =
                 RowType.of(
@@ -56,6 +58,7 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
         MergeFunction mergeFunction = new ValueCountMergeFunction();
         this.store =
                 new FileStoreImpl(
+                        schemaManager,
                         schema.id(),
                         new FileStoreOptions(schema.options()),
                         WriteMode.CHANGE_LOG,
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index f02259e6..6d611638 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFun
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.store.table.sink.SinkRecord;
@@ -54,7 +55,8 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
 
     private final FileStoreImpl store;
 
-    ChangelogWithKeyFileStoreTable(String name, Schema schema, String user) {
+    ChangelogWithKeyFileStoreTable(
+            String name, SchemaManager schemaManager, Schema schema, String 
user) {
         super(name, schema);
         RowType rowType = schema.logicalRowType();
 
@@ -92,6 +94,7 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
 
         this.store =
                 new FileStoreImpl(
+                        schemaManager,
                         schema.id(),
                         new FileStoreOptions(conf),
                         WriteMode.CHANGE_LOG,
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
index ae2feb62..3f2d4c97 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -39,8 +39,9 @@ public class FileStoreTableFactory {
     public static FileStoreTable create(Configuration conf, String user) {
         Path tablePath = FileStoreOptions.path(conf);
         String name = tablePath.getName();
+        SchemaManager schemaManager = new SchemaManager(tablePath);
         Schema schema =
-                new SchemaManager(tablePath)
+                schemaManager
                         .latest()
                         .orElseThrow(
                                 () ->
@@ -55,12 +56,12 @@ public class FileStoreTableFactory {
         schema = schema.copy(newOptions);
 
         if (conf.get(FileStoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
-            return new AppendOnlyFileStoreTable(name, schema, user);
+            return new AppendOnlyFileStoreTable(name, schemaManager, schema, 
user);
         } else {
             if (schema.primaryKeys().isEmpty()) {
-                return new ChangelogValueCountFileStoreTable(name, schema, 
user);
+                return new ChangelogValueCountFileStoreTable(name, 
schemaManager, schema, user);
             } else {
-                return new ChangelogWithKeyFileStoreTable(name, schema, user);
+                return new ChangelogWithKeyFileStoreTable(name, schemaManager, 
schema, user);
             }
         }
     }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index a8180385..7b872218 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -111,6 +112,7 @@ public class TestFileStore extends FileStoreImpl {
             RowType valueType,
             MergeFunction mergeFunction) {
         super(
+                new SchemaManager(options.path()),
                 0L,
                 options,
                 WriteMode.CHANGE_LOG,
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
index 781a91e6..81694a3f 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializerTest;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.format.FlushingFileFormat;
+import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
@@ -207,6 +208,7 @@ public class DataFileTest {
                         format);
         int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 
1024;
         return new DataFileWriter.Factory(
+                        0,
                         TestKeyValueGenerator.KEY_TYPE,
                         TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                         // normal format will buffer changes in memory and we 
can't determine
@@ -223,6 +225,8 @@ public class DataFileTest {
         FileStorePathFactory pathFactory = new FileStorePathFactory(new 
Path(path));
         DataFileReader.Factory factory =
                 new DataFileReader.Factory(
+                        new SchemaManager(new Path(path)),
+                        0,
                         TestKeyValueGenerator.KEY_TYPE,
                         TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                         new FlushingFileFormat(format),
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
index 2163ed62..86345713 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
@@ -137,6 +137,7 @@ public class DataFileTestDataGenerator {
                         valueStatsCollector.extract(),
                         minSequenceNumber,
                         maxSequenceNumber,
+                        0,
                         level),
                 kvs);
     }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
index 4ed387ac..df629d52 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
@@ -96,6 +96,7 @@ public class ManifestCommittableSerializerTest {
                 newTableStats(0, 1),
                 0,
                 1,
+                0,
                 level);
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index bcb6d76f..32b0051e 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -143,6 +144,8 @@ public class ManifestFileMetaTest {
 
     private ManifestFile createManifestFile(String path) {
         return new ManifestFile.Factory(
+                        new SchemaManager(new Path(path)),
+                        0,
                         PARTITION_TYPE,
                         avro,
                         new FileStorePathFactory(
@@ -222,7 +225,8 @@ public class ManifestFileMetaTest {
                 entries.length * 100, // for testing purpose
                 writtenMeta.numAddedFiles(),
                 writtenMeta.numDeletedFiles(),
-                writtenMeta.partitionStats());
+                writtenMeta.partitionStats(),
+                0);
     }
 
     private ManifestEntry makeEntry(boolean isAdd, String fileName) {
@@ -246,6 +250,7 @@ public class ManifestFileMetaTest {
                         StatsTestUtils.newEmptyTableStats(), // not used
                         0, // not used
                         0, // not used
+                        0, // not used
                         0 // not used
                         ));
     }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index 535d63dd..1f6cf287 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -100,7 +101,13 @@ public class ManifestFileTest {
                         "default",
                         FileStoreOptions.FILE_FORMAT.defaultValue());
         int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 
1024;
-        return new ManifestFile.Factory(DEFAULT_PART_TYPE, avro, pathFactory, 
suggestedFileSize)
+        return new ManifestFile.Factory(
+                        new SchemaManager(new Path(path)),
+                        0,
+                        DEFAULT_PART_TYPE,
+                        avro,
+                        pathFactory,
+                        suggestedFileSize)
                 .create();
     }
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
index 659ae3b8..11fc3813 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
@@ -101,7 +101,8 @@ public class ManifestTestDataGenerator {
                 entries.size() * 100L,
                 numAddedFiles,
                 numDeletedFiles,
-                collector.extract());
+                collector.extract(),
+                0);
     }
 
     private void mergeLevelsIfNeeded(BinaryRowData partition, int bucket) {
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
index acc78b7a..674bb2ec 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
@@ -62,6 +62,6 @@ public class LevelsTest {
     }
 
     public static DataFileMeta newFile(int level) {
-        return new DataFileMeta("", 0, 1, row(0), row(0), null, null, 0, 1, 
level);
+        return new DataFileMeta("", 0, 1, row(0), row(0), null, null, 0, 1, 0, 
level);
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 5683ace2..331e8a9e 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
 import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
 import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
+import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
@@ -74,6 +75,7 @@ public class MergeTreeTest {
 
     @TempDir java.nio.file.Path tempDir;
     private static ExecutorService service;
+    private Path path;
     private FileStorePathFactory pathFactory;
     private Comparator<RowData> comparator;
 
@@ -84,7 +86,8 @@ public class MergeTreeTest {
 
     @BeforeEach
     public void beforeEach() throws IOException {
-        pathFactory = new FileStorePathFactory(new Path(tempDir.toString()));
+        path = new Path(tempDir.toString());
+        pathFactory = new FileStorePathFactory(path);
         comparator = Comparator.comparingInt(o -> o.getInt(0));
         recreateMergeTree(1024 * 1024);
         Path bucketDir = 
dataFileWriter.pathFactory().toPath("ignore").getParent();
@@ -101,10 +104,17 @@ public class MergeTreeTest {
         RowType valueType = new RowType(singletonList(new 
RowType.RowField("v", new IntType())));
         FileFormat flushingAvro = new FlushingFileFormat("avro");
         dataFileReader =
-                new DataFileReader.Factory(keyType, valueType, flushingAvro, 
pathFactory)
+                new DataFileReader.Factory(
+                                new SchemaManager(path),
+                                0,
+                                keyType,
+                                valueType,
+                                flushingAvro,
+                                pathFactory)
                         .create(BinaryRowDataUtil.EMPTY_ROW, 0);
         dataFileWriter =
                 new DataFileWriter.Factory(
+                                0,
                                 keyType,
                                 valueType,
                                 flushingAvro,
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
index 98c0b58b..bffac73e 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
@@ -214,6 +214,7 @@ public class CompactManagerTest {
                 null,
                 0,
                 maxSequence,
+                0,
                 level);
     }
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
index e4ed061f..332de1a7 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
@@ -176,6 +176,7 @@ public class IntervalPartitionTest {
                 StatsTestUtils.newEmptyTableStats(), // not used
                 0,
                 24,
+                0,
                 0);
     }
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
index 5f388690..7dcb6f61 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
@@ -227,6 +227,6 @@ public class UniversalCompactionTest {
     }
 
     private DataFileMeta file(long size) {
-        return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0);
+        return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0, 
0);
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java
index 59cce2c7..f5927de3 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java
@@ -226,6 +226,6 @@ public class AppendOnlyWriterTest {
                         Thread.currentThread().getContextClassLoader(), AVRO, 
new Configuration());
 
         return new AppendOnlyWriter(
-                fileFormat, targetFileSize, writeSchema, maxSeqNum, 
pathFactory);
+                0, fileFormat, targetFileSize, writeSchema, maxSeqNum, 
pathFactory);
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index b69ebb1f..22460fee 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -173,15 +173,15 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         conf.set(FileStoreOptions.PATH, tablePath.toString());
         conf.set(FileStoreOptions.FILE_FORMAT, "avro");
         conf.set(FileStoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+        SchemaManager schemaManager = new SchemaManager(tablePath);
         Schema schema =
-                new SchemaManager(tablePath)
-                        .commitNewVersion(
-                                new UpdateSchema(
-                                        ROW_TYPE,
-                                        Collections.singletonList("pt"),
-                                        Collections.emptyList(),
-                                        conf.toMap(),
-                                        ""));
-        return new AppendOnlyFileStoreTable(tablePath.getName(), schema, 
"user");
+                schemaManager.commitNewVersion(
+                        new UpdateSchema(
+                                ROW_TYPE,
+                                Collections.singletonList("pt"),
+                                Collections.emptyList(),
+                                conf.toMap(),
+                                ""));
+        return new AppendOnlyFileStoreTable(tablePath.getName(), 
schemaManager, schema, "user");
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index bed0cad7..7154062c 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -170,15 +170,16 @@ public class ChangelogValueCountFileStoreTableTest 
extends FileStoreTableTestBas
         conf.set(FileStoreOptions.PATH, tablePath.toString());
         conf.set(FileStoreOptions.FILE_FORMAT, "avro");
         conf.set(FileStoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+        SchemaManager schemaManager = new SchemaManager(tablePath);
         Schema schema =
-                new SchemaManager(tablePath)
-                        .commitNewVersion(
-                                new UpdateSchema(
-                                        ROW_TYPE,
-                                        Collections.singletonList("pt"),
-                                        Collections.emptyList(),
-                                        conf.toMap(),
-                                        ""));
-        return new ChangelogValueCountFileStoreTable(tablePath.getName(), 
schema, "user");
+                schemaManager.commitNewVersion(
+                        new UpdateSchema(
+                                ROW_TYPE,
+                                Collections.singletonList("pt"),
+                                Collections.emptyList(),
+                                conf.toMap(),
+                                ""));
+        return new ChangelogValueCountFileStoreTable(
+                tablePath.getName(), schemaManager, schema, "user");
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index e575651b..b5da3cd4 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -180,15 +180,16 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         conf.set(FileStoreOptions.PATH, tablePath.toString());
         conf.set(FileStoreOptions.FILE_FORMAT, "avro");
         conf.set(FileStoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+        SchemaManager schemaManager = new SchemaManager(tablePath);
         Schema schema =
-                new SchemaManager(tablePath)
-                        .commitNewVersion(
-                                new UpdateSchema(
-                                        ROW_TYPE,
-                                        Collections.singletonList("pt"),
-                                        Arrays.asList("pt", "a"),
-                                        conf.toMap(),
-                                        ""));
-        return new ChangelogWithKeyFileStoreTable(tablePath.getName(), schema, 
"user");
+                schemaManager.commitNewVersion(
+                        new UpdateSchema(
+                                ROW_TYPE,
+                                Collections.singletonList("pt"),
+                                Arrays.asList("pt", "a"),
+                                conf.toMap(),
+                                ""));
+        return new ChangelogWithKeyFileStoreTable(
+                tablePath.getName(), schemaManager, schema, "user");
     }
 }
diff --git 
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
 
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
index a7ff2005..53a031c5 100644
--- 
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
+++ 
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
@@ -66,7 +66,7 @@ public class FileFormatSuffixTest extends DataFileTest {
                         format,
                         new Configuration());
         AppendOnlyWriter appendOnlyWriter =
-                new AppendOnlyWriter(fileFormat, 10, SCHEMA, 10, 
dataFilePathFactory);
+                new AppendOnlyWriter(0, fileFormat, 10, SCHEMA, 10, 
dataFilePathFactory);
         appendOnlyWriter.write(
                 ValueKind.ADD,
                 BinaryRowDataUtil.EMPTY_ROW,

Reply via email to