This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 00403245 [FLINK-27371] Record schema id in ManifestFileMeta and
DataFileMeta
00403245 is described below
commit 0040324527da95884ed14297b263feccd0c74ea6
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 13 18:24:35 2022 +0800
[FLINK-27371] Record schema id in ManifestFileMeta and DataFileMeta
This closes #141
---
.../table/store/connector/sink/TestFileStore.java | 1 +
.../source/FileStoreSourceSplitGeneratorTest.java | 1 +
.../source/FileStoreSourceSplitSerializerTest.java | 1 +
.../source/TestChangelogDataReadWrite.java | 9 +++++++-
.../flink/table/store/file/FileStoreImpl.java | 16 +++++++++++++++
.../flink/table/store/file/data/DataFileMeta.java | 18 ++++++++++++++--
.../store/file/data/DataFileMetaSerializer.java | 4 +++-
.../table/store/file/data/DataFileReader.java | 19 +++++++++++++++++
.../table/store/file/data/DataFileWriter.java | 8 ++++++++
.../table/store/file/manifest/ManifestFile.java | 18 +++++++++++++++-
.../store/file/manifest/ManifestFileMeta.java | 20 +++++++++++++-----
.../file/manifest/ManifestFileMetaSerializer.java | 17 +++++++--------
.../store/file/operation/FileStoreReadImpl.java | 6 +++++-
.../store/file/operation/FileStoreWriteImpl.java | 24 +++++++++++++++++++---
.../table/store/file/schema/SchemaManager.java | 16 +++++++--------
.../table/store/file/writer/AppendOnlyWriter.java | 7 +++++--
.../store/table/AppendOnlyFileStoreTable.java | 4 +++-
.../table/ChangelogValueCountFileStoreTable.java | 5 ++++-
.../table/ChangelogWithKeyFileStoreTable.java | 5 ++++-
.../table/store/table/FileStoreTableFactory.java | 9 ++++----
.../flink/table/store/file/TestFileStore.java | 2 ++
.../flink/table/store/file/data/DataFileTest.java | 4 ++++
.../store/file/data/DataFileTestDataGenerator.java | 1 +
.../ManifestCommittableSerializerTest.java | 1 +
.../store/file/manifest/ManifestFileMetaTest.java | 7 ++++++-
.../store/file/manifest/ManifestFileTest.java | 9 +++++++-
.../file/manifest/ManifestTestDataGenerator.java | 3 ++-
.../table/store/file/mergetree/LevelsTest.java | 2 +-
.../table/store/file/mergetree/MergeTreeTest.java | 14 +++++++++++--
.../file/mergetree/compact/CompactManagerTest.java | 1 +
.../mergetree/compact/IntervalPartitionTest.java | 1 +
.../mergetree/compact/UniversalCompactionTest.java | 2 +-
.../store/file/writer/AppendOnlyWriterTest.java | 2 +-
.../store/table/AppendOnlyFileStoreTableTest.java | 18 ++++++++--------
.../ChangelogValueCountFileStoreTableTest.java | 19 +++++++++--------
.../table/ChangelogWithKeyFileStoreTableTest.java | 19 +++++++++--------
.../table/store/format/FileFormatSuffixTest.java | 2 +-
37 files changed, 239 insertions(+), 76 deletions(-)
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index 3b797e0e..639d5294 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -215,6 +215,7 @@ public class TestFileStore implements FileStore {
newEmptyTableStats(3),
0,
0,
+ 0,
0))
.collect(Collectors.toList());
return new Increment(newFiles, Collections.emptyList(),
Collections.emptyList());
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 254e6992..6af47d5d 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -129,6 +129,7 @@ public class FileStoreSourceSplitGeneratorTest {
StatsTestUtils.newEmptyTableStats(), // not used
0, // not used
0, // not used
+ 0, // not used
0 // not used
));
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
index 45ec4dd0..d01be01c 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
@@ -82,6 +82,7 @@ public class FileStoreSourceSplitSerializerTest {
StatsTestUtils.newEmptyTableStats(),
0,
1,
+ 0,
level);
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 2d151c3c..4aa5f332 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -35,6 +35,7 @@ import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunct
import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
+import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -66,6 +67,7 @@ public class TestChangelogDataReadWrite {
Comparator.comparingLong(o -> o.getLong(0));
private final FileFormat avro;
+ private final Path tablePath;
private final FileStorePathFactory pathFactory;
private final SnapshotManager snapshotManager;
private final ExecutorService service;
@@ -76,9 +78,10 @@ public class TestChangelogDataReadWrite {
Thread.currentThread().getContextClassLoader(),
"avro",
new Configuration());
+ this.tablePath = new Path(root);
this.pathFactory =
new FileStorePathFactory(
- new Path(root),
+ tablePath,
RowType.of(new IntType()),
"default",
FileStoreOptions.FILE_FORMAT.defaultValue());
@@ -99,6 +102,8 @@ public class TestChangelogDataReadWrite {
rowDataIteratorCreator) {
FileStoreRead read =
new FileStoreReadImpl(
+ new SchemaManager(tablePath),
+ 0,
WriteMode.CHANGE_LOG,
KEY_TYPE,
VALUE_TYPE,
@@ -143,6 +148,8 @@ public class TestChangelogDataReadWrite {
MergeTreeOptions options = new MergeTreeOptions(new Configuration());
return new FileStoreWriteImpl(
WriteMode.CHANGE_LOG,
+ new SchemaManager(tablePath),
+ 0,
KEY_TYPE,
VALUE_TYPE,
() -> COMPARATOR,
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index a0b2d243..f391f3b4 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -31,6 +31,7 @@ import
org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
import org.apache.flink.table.store.file.operation.FileStoreScanImpl;
import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
+import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.KeyComparatorSupplier;
import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -48,6 +49,7 @@ import java.util.stream.Collectors;
/** File store implementation. */
public class FileStoreImpl implements FileStore {
+ private final SchemaManager schemaManager;
private final long schemaId;
private final WriteMode writeMode;
private final FileStoreOptions options;
@@ -59,6 +61,7 @@ public class FileStoreImpl implements FileStore {
@Nullable private final MergeFunction mergeFunction;
public FileStoreImpl(
+ SchemaManager schemaManager,
long schemaId,
FileStoreOptions options,
WriteMode writeMode,
@@ -67,6 +70,7 @@ public class FileStoreImpl implements FileStore {
RowType keyType,
RowType valueType,
@Nullable MergeFunction mergeFunction) {
+ this.schemaManager = schemaManager;
this.schemaId = schemaId;
this.options = options;
this.writeMode = writeMode;
@@ -93,6 +97,8 @@ public class FileStoreImpl implements FileStore {
@VisibleForTesting
public ManifestFile.Factory manifestFileFactory() {
return new ManifestFile.Factory(
+ schemaManager,
+ schemaId,
partitionType,
options.manifestFormat(),
pathFactory(),
@@ -108,6 +114,8 @@ public class FileStoreImpl implements FileStore {
public FileStoreWriteImpl newWrite() {
return new FileStoreWriteImpl(
writeMode,
+ schemaManager,
+ schemaId,
keyType,
valueType,
keyComparatorSupplier,
@@ -122,6 +130,8 @@ public class FileStoreImpl implements FileStore {
@Override
public FileStoreReadImpl newRead() {
return new FileStoreReadImpl(
+ schemaManager,
+ schemaId,
writeMode,
keyType,
valueType,
@@ -192,12 +202,14 @@ public class FileStoreImpl implements FileStore {
}
public static FileStoreImpl createWithAppendOnly(
+ SchemaManager schemaManager,
long schemaId,
FileStoreOptions options,
String user,
RowType partitionType,
RowType rowType) {
return new FileStoreImpl(
+ schemaManager,
schemaId,
options,
WriteMode.APPEND_ONLY,
@@ -209,6 +221,7 @@ public class FileStoreImpl implements FileStore {
}
public static FileStoreImpl createWithPrimaryKey(
+ SchemaManager schemaManager,
long schemaId,
FileStoreOptions options,
String user,
@@ -246,6 +259,7 @@ public class FileStoreImpl implements FileStore {
}
return new FileStoreImpl(
+ schemaManager,
schemaId,
options,
WriteMode.CHANGE_LOG,
@@ -257,6 +271,7 @@ public class FileStoreImpl implements FileStore {
}
public static FileStoreImpl createWithValueCount(
+ SchemaManager schemaManager,
long schemaId,
FileStoreOptions options,
String user,
@@ -267,6 +282,7 @@ public class FileStoreImpl implements FileStore {
new LogicalType[] {new BigIntType(false)}, new
String[] {"_VALUE_COUNT"});
MergeFunction mergeFunction = new ValueCountMergeFunction();
return new FileStoreImpl(
+ schemaManager,
schemaId,
options,
WriteMode.CHANGE_LOG,
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
index aee265a9..e85e3f5e 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
@@ -56,6 +56,7 @@ public class DataFileMeta {
private final long minSequenceNumber;
private final long maxSequenceNumber;
+ private final long schemaId;
private final int level;
public static DataFileMeta forAppend(
@@ -64,7 +65,8 @@ public class DataFileMeta {
long rowCount,
BinaryTableStats rowStats,
long minSequenceNumber,
- long maxSequenceNumber) {
+ long maxSequenceNumber,
+ long schemaId) {
return new DataFileMeta(
fileName,
fileSize,
@@ -75,6 +77,7 @@ public class DataFileMeta {
rowStats,
minSequenceNumber,
maxSequenceNumber,
+ schemaId,
DUMMY_LEVEL);
}
@@ -88,6 +91,7 @@ public class DataFileMeta {
BinaryTableStats valueStats,
long minSequenceNumber,
long maxSequenceNumber,
+ long schemaId,
int level) {
this.fileName = fileName;
this.fileSize = fileSize;
@@ -101,6 +105,7 @@ public class DataFileMeta {
this.minSequenceNumber = minSequenceNumber;
this.maxSequenceNumber = maxSequenceNumber;
this.level = level;
+ this.schemaId = schemaId;
}
public String fileName() {
@@ -139,6 +144,10 @@ public class DataFileMeta {
return maxSequenceNumber;
}
+ public long schemaId() {
+ return schemaId;
+ }
+
public int level() {
return level;
}
@@ -155,6 +164,7 @@ public class DataFileMeta {
valueStats,
minSequenceNumber,
maxSequenceNumber,
+ schemaId,
newLevel);
}
@@ -173,6 +183,7 @@ public class DataFileMeta {
&& Objects.equals(valueStats, that.valueStats)
&& minSequenceNumber == that.minSequenceNumber
&& maxSequenceNumber == that.maxSequenceNumber
+ && schemaId == that.schemaId
&& level == that.level;
}
@@ -188,13 +199,14 @@ public class DataFileMeta {
valueStats,
minSequenceNumber,
maxSequenceNumber,
+ schemaId,
level);
}
@Override
public String toString() {
return String.format(
- "{%s, %d, %d, %s, %s, %s, %s, %d, %d, %d}",
+ "{%s, %d, %d, %s, %s, %s, %s, %d, %d, %d, %d}",
fileName,
fileSize,
rowCount,
@@ -204,6 +216,7 @@ public class DataFileMeta {
valueStats,
minSequenceNumber,
maxSequenceNumber,
+ schemaId,
level);
}
@@ -218,6 +231,7 @@ public class DataFileMeta {
fields.add(new RowType.RowField("_VALUE_STATS",
FieldStatsArraySerializer.schema()));
fields.add(new RowType.RowField("_MIN_SEQUENCE_NUMBER", new
BigIntType(false)));
fields.add(new RowType.RowField("_MAX_SEQUENCE_NUMBER", new
BigIntType(false)));
+ fields.add(new RowType.RowField("_SCHEMA_ID", new BigIntType(false)));
fields.add(new RowType.RowField("_LEVEL", new IntType(false)));
return new RowType(fields);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java
index ef1db428..6050a8fb 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java
@@ -48,6 +48,7 @@ public class DataFileMetaSerializer extends
ObjectSerializer<DataFileMeta> {
meta.valueStats().toRowData(),
meta.minSequenceNumber(),
meta.maxSequenceNumber(),
+ meta.schemaId(),
meta.level());
}
@@ -63,6 +64,7 @@ public class DataFileMetaSerializer extends
ObjectSerializer<DataFileMeta> {
BinaryTableStats.fromRowData(row.getRow(6, 3)),
row.getLong(7),
row.getLong(8),
- row.getInt(9));
+ row.getLong(9),
+ row.getInt(10));
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
index fccd7a6e..aecf25ae 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializer;
import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.RecordReader;
@@ -45,16 +46,24 @@ import java.io.IOException;
*/
public class DataFileReader {
+ private final SchemaManager schemaManager;
+ private final long schemaId;
private final RowType keyType;
private final RowType valueType;
+
+ // TODO introduce Map<SchemaId, readerFactory>
private final BulkFormat<RowData, FileSourceSplit> readerFactory;
private final DataFilePathFactory pathFactory;
private DataFileReader(
+ SchemaManager schemaManager,
+ long schemaId,
RowType keyType,
RowType valueType,
BulkFormat<RowData, FileSourceSplit> readerFactory,
DataFilePathFactory pathFactory) {
+ this.schemaManager = schemaManager;
+ this.schemaId = schemaId;
this.keyType = keyType;
this.valueType = valueType;
this.readerFactory = readerFactory;
@@ -104,6 +113,8 @@ public class DataFileReader {
@Override
public KeyValue next() throws IOException {
RecordAndPosition<RowData> result = iterator.next();
+
+ // TODO schema evolution
return result == null ? null :
serializer.fromRow(result.getRecord());
}
@@ -116,6 +127,8 @@ public class DataFileReader {
/** Creates {@link DataFileReader}. */
public static class Factory {
+ private final SchemaManager schemaManager;
+ private final long schemaId;
private final RowType keyType;
private final RowType valueType;
private final FileFormat fileFormat;
@@ -127,10 +140,14 @@ public class DataFileReader {
private RowType projectedValueType;
public Factory(
+ SchemaManager schemaManager,
+ long schemaId,
RowType keyType,
RowType valueType,
FileFormat fileFormat,
FileStorePathFactory pathFactory) {
+ this.schemaManager = schemaManager;
+ this.schemaId = schemaId;
this.keyType = keyType;
this.valueType = valueType;
this.fileFormat = fileFormat;
@@ -158,6 +175,8 @@ public class DataFileReader {
int[][] projection =
KeyValue.project(keyProjection, valueProjection,
keyType.getFieldCount());
return new DataFileReader(
+ schemaManager,
+ schemaId,
projectedKeyType,
projectedValueType,
fileFormat.createReaderFactory(recordType, projection),
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
index 096daad7..91ac96d9 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java
@@ -57,6 +57,7 @@ public class DataFileWriter {
private static final Logger LOG =
LoggerFactory.getLogger(DataFileWriter.class);
+ private final long schemaId;
private final RowType keyType;
private final RowType valueType;
private final BulkWriter.Factory<RowData> writerFactory;
@@ -67,12 +68,14 @@ public class DataFileWriter {
private final long suggestedFileSize;
private DataFileWriter(
+ long schemaId,
RowType keyType,
RowType valueType,
BulkWriter.Factory<RowData> writerFactory,
@Nullable FileStatsExtractor fileStatsExtractor,
DataFilePathFactory pathFactory,
long suggestedFileSize) {
+ this.schemaId = schemaId;
this.keyType = keyType;
this.valueType = valueType;
this.writerFactory = writerFactory;
@@ -199,6 +202,7 @@ public class DataFileWriter {
valueStats,
minSeqNumber,
maxSeqNumber,
+ schemaId,
level);
}
}
@@ -235,6 +239,7 @@ public class DataFileWriter {
/** Creates {@link DataFileWriter}. */
public static class Factory {
+ private final long schemaId;
private final RowType keyType;
private final RowType valueType;
private final FileFormat fileFormat;
@@ -242,11 +247,13 @@ public class DataFileWriter {
private final long suggestedFileSize;
public Factory(
+ long schemaId,
RowType keyType,
RowType valueType,
FileFormat fileFormat,
FileStorePathFactory pathFactory,
long suggestedFileSize) {
+ this.schemaId = schemaId;
this.keyType = keyType;
this.valueType = valueType;
this.fileFormat = fileFormat;
@@ -257,6 +264,7 @@ public class DataFileWriter {
public DataFileWriter create(BinaryRowData partition, int bucket) {
RowType recordType = KeyValue.schema(keyType, valueType);
return new DataFileWriter(
+ schemaId,
keyType,
valueType,
fileFormat.createWriterFactory(recordType),
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 9aa30c3c..8b9d43d5 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -25,6 +25,7 @@ import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.stats.FieldStatsCollector;
import org.apache.flink.table.store.file.stats.FileStatsExtractor;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -53,6 +54,8 @@ public class ManifestFile {
private static final Logger LOG =
LoggerFactory.getLogger(ManifestFile.class);
+ private final SchemaManager schemaManager;
+ private final long schemaId;
private final RowType partitionType;
private final ManifestEntrySerializer serializer;
private final BulkFormat<RowData, FileSourceSplit> readerFactory;
@@ -61,6 +64,8 @@ public class ManifestFile {
private final FileWriter.Factory<ManifestEntry, Metric> fileWriterFactory;
private ManifestFile(
+ SchemaManager schemaManager,
+ long schemaId,
RowType partitionType,
RowType entryType,
ManifestEntrySerializer serializer,
@@ -69,6 +74,8 @@ public class ManifestFile {
FileStatsExtractor fileStatsExtractor,
FileStorePathFactory pathFactory,
long suggestedFileSize) {
+ this.schemaManager = schemaManager;
+ this.schemaId = schemaId;
this.partitionType = partitionType;
this.serializer = serializer;
this.readerFactory = readerFactory;
@@ -160,7 +167,8 @@ public class ManifestFile {
path.getFileSystem().getFileStatus(path).getLen(),
numAddedFiles,
numDeletedFiles,
- partitionStatsCollector.extract());
+ partitionStatsCollector.extract(),
+ schemaId);
}
}
@@ -193,16 +201,22 @@ public class ManifestFile {
*/
public static class Factory {
+ private final SchemaManager schemaManager;
+ private final long schemaId;
private final RowType partitionType;
private final FileFormat fileFormat;
private final FileStorePathFactory pathFactory;
private final long suggestedFileSize;
public Factory(
+ SchemaManager schemaManager,
+ long schemaId,
RowType partitionType,
FileFormat fileFormat,
FileStorePathFactory pathFactory,
long suggestedFileSize) {
+ this.schemaManager = schemaManager;
+ this.schemaId = schemaId;
this.partitionType = partitionType;
this.fileFormat = fileFormat;
this.pathFactory = pathFactory;
@@ -212,6 +226,8 @@ public class ManifestFile {
public ManifestFile create() {
RowType entryType =
VersionedObjectSerializer.versionType(ManifestEntry.schema());
return new ManifestFile(
+ schemaManager,
+ schemaId,
partitionType,
entryType,
new ManifestEntrySerializer(),
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
index 80d2bbd6..e80da5f7 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
@@ -39,18 +39,21 @@ public class ManifestFileMeta {
private final long numAddedFiles;
private final long numDeletedFiles;
private final BinaryTableStats partitionStats;
+ private final long schemaId;
public ManifestFileMeta(
String fileName,
long fileSize,
long numAddedFiles,
long numDeletedFiles,
- BinaryTableStats partitionStats) {
+ BinaryTableStats partitionStats,
+ long schemaId) {
this.fileName = fileName;
this.fileSize = fileSize;
this.numAddedFiles = numAddedFiles;
this.numDeletedFiles = numDeletedFiles;
this.partitionStats = partitionStats;
+ this.schemaId = schemaId;
}
public String fileName() {
@@ -73,6 +76,10 @@ public class ManifestFileMeta {
return partitionStats;
}
+ public long schemaId() {
+ return schemaId;
+ }
+
public static RowType schema() {
List<RowType.RowField> fields = new ArrayList<>();
fields.add(new RowType.RowField("_FILE_NAME", new VarCharType(false,
Integer.MAX_VALUE)));
@@ -80,6 +87,7 @@ public class ManifestFileMeta {
fields.add(new RowType.RowField("_NUM_ADDED_FILES", new
BigIntType(false)));
fields.add(new RowType.RowField("_NUM_DELETED_FILES", new
BigIntType(false)));
fields.add(new RowType.RowField("_PARTITION_STATS",
FieldStatsArraySerializer.schema()));
+ fields.add(new RowType.RowField("_SCHEMA_ID", new BigIntType(false)));
return new RowType(fields);
}
@@ -93,19 +101,21 @@ public class ManifestFileMeta {
&& fileSize == that.fileSize
&& numAddedFiles == that.numAddedFiles
&& numDeletedFiles == that.numDeletedFiles
- && Objects.equals(partitionStats, that.partitionStats);
+ && Objects.equals(partitionStats, that.partitionStats)
+ && schemaId == that.schemaId;
}
@Override
public int hashCode() {
- return Objects.hash(fileName, fileSize, numAddedFiles,
numDeletedFiles, partitionStats);
+ return Objects.hash(
+ fileName, fileSize, numAddedFiles, numDeletedFiles,
partitionStats, schemaId);
}
@Override
public String toString() {
return String.format(
- "{%s, %d, %d, %d, %s}",
- fileName, fileSize, numAddedFiles, numDeletedFiles,
partitionStats);
+ "{%s, %d, %d, %d, %s, %d}",
+ fileName, fileSize, numAddedFiles, numDeletedFiles,
partitionStats, schemaId);
}
/**
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaSerializer.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaSerializer.java
index ee306daa..37ee023b 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaSerializer.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaSerializer.java
@@ -40,13 +40,13 @@ public class ManifestFileMetaSerializer extends
VersionedObjectSerializer<Manife
@Override
public RowData convertTo(ManifestFileMeta meta) {
- GenericRowData row = new GenericRowData(5);
- row.setField(0, StringData.fromString(meta.fileName()));
- row.setField(1, meta.fileSize());
- row.setField(2, meta.numAddedFiles());
- row.setField(3, meta.numDeletedFiles());
- row.setField(4, meta.partitionStats().toRowData());
- return row;
+ return GenericRowData.of(
+ StringData.fromString(meta.fileName()),
+ meta.fileSize(),
+ meta.numAddedFiles(),
+ meta.numDeletedFiles(),
+ meta.partitionStats().toRowData(),
+ meta.schemaId());
}
@Override
@@ -65,6 +65,7 @@ public class ManifestFileMetaSerializer extends
VersionedObjectSerializer<Manife
row.getLong(1),
row.getLong(2),
row.getLong(3),
- BinaryTableStats.fromRowData(row.getRow(4, 3)));
+ BinaryTableStats.fromRowData(row.getRow(4, 3)),
+ row.getLong(5));
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
index 995ba622..5d6ef5ca 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
@@ -29,6 +29,7 @@ import
org.apache.flink.table.store.file.mergetree.MergeTreeReader;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.types.logical.RowType;
@@ -53,6 +54,8 @@ public class FileStoreReadImpl implements FileStoreRead {
private boolean dropDelete = true;
public FileStoreReadImpl(
+ SchemaManager schemaManager,
+ long schemaId,
WriteMode writeMode,
RowType keyType,
RowType valueType,
@@ -61,7 +64,8 @@ public class FileStoreReadImpl implements FileStoreRead {
FileFormat fileFormat,
FileStorePathFactory pathFactory) {
this.dataFileReaderFactory =
- new DataFileReader.Factory(keyType, valueType, fileFormat,
pathFactory);
+ new DataFileReader.Factory(
+ schemaManager, schemaId, keyType, valueType,
fileFormat, pathFactory);
this.writeMode = writeMode;
this.keyComparator = keyComparator;
this.mergeFunction = mergeFunction;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
index fba8c330..b28653f1 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
@@ -37,6 +37,7 @@ import
org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
+import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -58,6 +59,8 @@ import java.util.function.Supplier;
public class FileStoreWriteImpl implements FileStoreWrite {
private final WriteMode writeMode;
+ private final SchemaManager schemaManager;
+ private final long schemaId;
private final RowType valueType;
private final DataFileReader.Factory dataFileReaderFactory;
private final DataFileWriter.Factory dataFileWriterFactory;
@@ -71,6 +74,8 @@ public class FileStoreWriteImpl implements FileStoreWrite {
public FileStoreWriteImpl(
WriteMode writeMode,
+ SchemaManager schemaManager,
+ long schemaId,
RowType keyType,
RowType valueType,
Supplier<Comparator<RowData>> keyComparatorSupplier,
@@ -80,12 +85,20 @@ public class FileStoreWriteImpl implements FileStoreWrite {
SnapshotManager snapshotManager,
FileStoreScan scan,
MergeTreeOptions options) {
+ this.schemaManager = schemaManager;
+ this.schemaId = schemaId;
this.valueType = valueType;
this.dataFileReaderFactory =
- new DataFileReader.Factory(keyType, valueType, fileFormat,
pathFactory);
+ new DataFileReader.Factory(
+ schemaManager, schemaId, keyType, valueType,
fileFormat, pathFactory);
this.dataFileWriterFactory =
new DataFileWriter.Factory(
- keyType, valueType, fileFormat, pathFactory,
options.targetFileSize);
+ schemaId,
+ keyType,
+ valueType,
+ fileFormat,
+ pathFactory,
+ options.targetFileSize);
this.writeMode = writeMode;
this.fileFormat = fileFormat;
this.keyComparatorSupplier = keyComparatorSupplier;
@@ -121,7 +134,12 @@ public class FileStoreWriteImpl implements FileStoreWrite {
.orElse(-1L);
return new AppendOnlyWriter(
- fileFormat, options.targetFileSize, valueType,
maxSeqNum, factory);
+ schemaId,
+ fileFormat,
+ options.targetFileSize,
+ valueType,
+ maxSeqNum,
+ factory);
case CHANGE_LOG:
if (latestSnapshotId == null) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index ac835fc1..2613c4a4 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
+import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
@@ -37,24 +38,16 @@ import java.util.stream.Collectors;
import static
org.apache.flink.table.store.file.utils.FileUtils.listVersionedFiles;
/** Schema Manager to manage schema versions. */
-public class SchemaManager {
+public class SchemaManager implements Serializable {
private static final String SCHEMA_PREFIX = "schema-";
private final Path tableRoot;
- /** Default no lock. */
- private Lock lock = Callable::call;
-
public SchemaManager(Path tableRoot) {
this.tableRoot = tableRoot;
}
- public SchemaManager withLock(Lock lock) {
- this.lock = lock;
- return this;
- }
-
/** @return latest schema. */
public Optional<Schema> latest() {
try {
@@ -83,6 +76,11 @@ public class SchemaManager {
/** Create a new schema from {@link UpdateSchema}. */
public Schema commitNewVersion(UpdateSchema updateSchema) throws Exception
{
+ return commitNewVersion(Callable::call, updateSchema);
+ }
+
+ /** Create a new schema from {@link UpdateSchema}. */
+ public Schema commitNewVersion(Lock lock, UpdateSchema updateSchema)
throws Exception {
RowType rowType = updateSchema.rowType();
List<String> partitionKeys = updateSchema.partitionKeys();
List<String> primaryKeys = updateSchema.primaryKeys();
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java
index d5343319..a3842c77 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java
@@ -43,6 +43,7 @@ import java.util.function.Supplier;
* operations and don't have any unique keys or sort keys.
*/
public class AppendOnlyWriter implements RecordWriter {
+ private final long schemaId;
private final long targetFileSize;
private final DataFilePathFactory pathFactory;
private final FieldStatsArraySerializer statsArraySerializer;
@@ -53,12 +54,13 @@ public class AppendOnlyWriter implements RecordWriter {
private RowRollingWriter writer;
public AppendOnlyWriter(
+ long schemaId,
FileFormat fileFormat,
long targetFileSize,
RowType writeSchema,
long maxWroteSeqNumber,
DataFilePathFactory pathFactory) {
-
+ this.schemaId = schemaId;
this.targetFileSize = targetFileSize;
this.pathFactory = pathFactory;
this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema);
@@ -158,7 +160,8 @@ public class AppendOnlyWriter implements RecordWriter {
recordCount(),
stats,
minSeqNum,
- Math.max(minSeqNum, nextSeqNum - 1));
+ Math.max(minSeqNum, nextSeqNum - 1),
+ schemaId);
}
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 26472592..ff0c8539 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.table.sink.SinkRecord;
@@ -46,10 +47,11 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
private final FileStoreImpl store;
- AppendOnlyFileStoreTable(String name, Schema schema, String user) {
+ AppendOnlyFileStoreTable(String name, SchemaManager schemaManager, Schema
schema, String user) {
super(name, schema);
this.store =
new FileStoreImpl(
+ schemaManager,
schema.id(),
new FileStoreOptions(schema.options()),
WriteMode.APPEND_ONLY,
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index caa98e3b..79785d6e 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -29,6 +29,7 @@ import
org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import
org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.table.sink.SinkRecord;
@@ -48,7 +49,8 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
private final FileStoreImpl store;
- ChangelogValueCountFileStoreTable(String name, Schema schema, String user)
{
+ ChangelogValueCountFileStoreTable(
+ String name, SchemaManager schemaManager, Schema schema, String
user) {
super(name, schema);
RowType countType =
RowType.of(
@@ -56,6 +58,7 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
MergeFunction mergeFunction = new ValueCountMergeFunction();
this.store =
new FileStoreImpl(
+ schemaManager,
schema.id(),
new FileStoreOptions(schema.options()),
WriteMode.CHANGE_LOG,
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index f02259e6..6d611638 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -31,6 +31,7 @@ import
org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFun
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.Schema;
+import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.table.sink.SinkRecord;
@@ -54,7 +55,8 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
private final FileStoreImpl store;
- ChangelogWithKeyFileStoreTable(String name, Schema schema, String user) {
+ ChangelogWithKeyFileStoreTable(
+ String name, SchemaManager schemaManager, Schema schema, String
user) {
super(name, schema);
RowType rowType = schema.logicalRowType();
@@ -92,6 +94,7 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
this.store =
new FileStoreImpl(
+ schemaManager,
schema.id(),
new FileStoreOptions(conf),
WriteMode.CHANGE_LOG,
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
index ae2feb62..3f2d4c97 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -39,8 +39,9 @@ public class FileStoreTableFactory {
public static FileStoreTable create(Configuration conf, String user) {
Path tablePath = FileStoreOptions.path(conf);
String name = tablePath.getName();
+ SchemaManager schemaManager = new SchemaManager(tablePath);
Schema schema =
- new SchemaManager(tablePath)
+ schemaManager
.latest()
.orElseThrow(
() ->
@@ -55,12 +56,12 @@ public class FileStoreTableFactory {
schema = schema.copy(newOptions);
if (conf.get(FileStoreOptions.WRITE_MODE) == WriteMode.APPEND_ONLY) {
- return new AppendOnlyFileStoreTable(name, schema, user);
+ return new AppendOnlyFileStoreTable(name, schemaManager, schema,
user);
} else {
if (schema.primaryKeys().isEmpty()) {
- return new ChangelogValueCountFileStoreTable(name, schema,
user);
+ return new ChangelogValueCountFileStoreTable(name,
schemaManager, schema, user);
} else {
- return new ChangelogWithKeyFileStoreTable(name, schema, user);
+ return new ChangelogWithKeyFileStoreTable(name, schemaManager,
schema, user);
}
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index a8180385..7b872218 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -36,6 +36,7 @@ import
org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.SnapshotManager;
@@ -111,6 +112,7 @@ public class TestFileStore extends FileStoreImpl {
RowType valueType,
MergeFunction mergeFunction) {
super(
+ new SchemaManager(options.path()),
0L,
options,
WriteMode.CHANGE_LOG,
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
index 781a91e6..81694a3f 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializerTest;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.format.FlushingFileFormat;
+import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
@@ -207,6 +208,7 @@ public class DataFileTest {
format);
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) +
1024;
return new DataFileWriter.Factory(
+ 0,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
// normal format will buffer changes in memory and we
can't determine
@@ -223,6 +225,8 @@ public class DataFileTest {
FileStorePathFactory pathFactory = new FileStorePathFactory(new
Path(path));
DataFileReader.Factory factory =
new DataFileReader.Factory(
+ new SchemaManager(new Path(path)),
+ 0,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
new FlushingFileFormat(format),
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
index 2163ed62..86345713 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestDataGenerator.java
@@ -137,6 +137,7 @@ public class DataFileTestDataGenerator {
valueStatsCollector.extract(),
minSequenceNumber,
maxSequenceNumber,
+ 0,
level),
kvs);
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
index 4ed387ac..df629d52 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
@@ -96,6 +96,7 @@ public class ManifestCommittableSerializerTest {
newTableStats(0, 1),
0,
1,
+ 0,
level);
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index bcb6d76f..32b0051e 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -143,6 +144,8 @@ public class ManifestFileMetaTest {
private ManifestFile createManifestFile(String path) {
return new ManifestFile.Factory(
+ new SchemaManager(new Path(path)),
+ 0,
PARTITION_TYPE,
avro,
new FileStorePathFactory(
@@ -222,7 +225,8 @@ public class ManifestFileMetaTest {
entries.length * 100, // for testing purpose
writtenMeta.numAddedFiles(),
writtenMeta.numDeletedFiles(),
- writtenMeta.partitionStats());
+ writtenMeta.partitionStats(),
+ 0);
}
private ManifestEntry makeEntry(boolean isAdd, String fileName) {
@@ -246,6 +250,7 @@ public class ManifestFileMetaTest {
StatsTestUtils.newEmptyTableStats(), // not used
0, // not used
0, // not used
+ 0, // not used
0 // not used
));
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index 535d63dd..1f6cf287 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -100,7 +101,13 @@ public class ManifestFileTest {
"default",
FileStoreOptions.FILE_FORMAT.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) +
1024;
- return new ManifestFile.Factory(DEFAULT_PART_TYPE, avro, pathFactory,
suggestedFileSize)
+ return new ManifestFile.Factory(
+ new SchemaManager(new Path(path)),
+ 0,
+ DEFAULT_PART_TYPE,
+ avro,
+ pathFactory,
+ suggestedFileSize)
.create();
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
index 659ae3b8..11fc3813 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
@@ -101,7 +101,8 @@ public class ManifestTestDataGenerator {
entries.size() * 100L,
numAddedFiles,
numDeletedFiles,
- collector.extract());
+ collector.extract(),
+ 0);
}
private void mergeLevelsIfNeeded(BinaryRowData partition, int bucket) {
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
index acc78b7a..674bb2ec 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
@@ -62,6 +62,6 @@ public class LevelsTest {
}
public static DataFileMeta newFile(int level) {
- return new DataFileMeta("", 0, 1, row(0), row(0), null, null, 0, 1,
level);
+ return new DataFileMeta("", 0, 1, row(0), row(0), null, null, 0, 1, 0,
level);
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 5683ace2..331e8a9e 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -37,6 +37,7 @@ import
org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
+import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
@@ -74,6 +75,7 @@ public class MergeTreeTest {
@TempDir java.nio.file.Path tempDir;
private static ExecutorService service;
+ private Path path;
private FileStorePathFactory pathFactory;
private Comparator<RowData> comparator;
@@ -84,7 +86,8 @@ public class MergeTreeTest {
@BeforeEach
public void beforeEach() throws IOException {
- pathFactory = new FileStorePathFactory(new Path(tempDir.toString()));
+ path = new Path(tempDir.toString());
+ pathFactory = new FileStorePathFactory(path);
comparator = Comparator.comparingInt(o -> o.getInt(0));
recreateMergeTree(1024 * 1024);
Path bucketDir =
dataFileWriter.pathFactory().toPath("ignore").getParent();
@@ -101,10 +104,17 @@ public class MergeTreeTest {
RowType valueType = new RowType(singletonList(new
RowType.RowField("v", new IntType())));
FileFormat flushingAvro = new FlushingFileFormat("avro");
dataFileReader =
- new DataFileReader.Factory(keyType, valueType, flushingAvro,
pathFactory)
+ new DataFileReader.Factory(
+ new SchemaManager(path),
+ 0,
+ keyType,
+ valueType,
+ flushingAvro,
+ pathFactory)
.create(BinaryRowDataUtil.EMPTY_ROW, 0);
dataFileWriter =
new DataFileWriter.Factory(
+ 0,
keyType,
valueType,
flushingAvro,
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
index 98c0b58b..bffac73e 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
@@ -214,6 +214,7 @@ public class CompactManagerTest {
null,
0,
maxSequence,
+ 0,
level);
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
index e4ed061f..332de1a7 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
@@ -176,6 +176,7 @@ public class IntervalPartitionTest {
StatsTestUtils.newEmptyTableStats(), // not used
0,
24,
+ 0,
0);
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
index 5f388690..7dcb6f61 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
@@ -227,6 +227,6 @@ public class UniversalCompactionTest {
}
private DataFileMeta file(long size) {
- return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0);
+ return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0,
0);
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java
index 59cce2c7..f5927de3 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java
@@ -226,6 +226,6 @@ public class AppendOnlyWriterTest {
Thread.currentThread().getContextClassLoader(), AVRO,
new Configuration());
return new AppendOnlyWriter(
- fileFormat, targetFileSize, writeSchema, maxSeqNum,
pathFactory);
+ 0, fileFormat, targetFileSize, writeSchema, maxSeqNum,
pathFactory);
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index b69ebb1f..22460fee 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -173,15 +173,15 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
conf.set(FileStoreOptions.PATH, tablePath.toString());
conf.set(FileStoreOptions.FILE_FORMAT, "avro");
conf.set(FileStoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+ SchemaManager schemaManager = new SchemaManager(tablePath);
Schema schema =
- new SchemaManager(tablePath)
- .commitNewVersion(
- new UpdateSchema(
- ROW_TYPE,
- Collections.singletonList("pt"),
- Collections.emptyList(),
- conf.toMap(),
- ""));
- return new AppendOnlyFileStoreTable(tablePath.getName(), schema,
"user");
+ schemaManager.commitNewVersion(
+ new UpdateSchema(
+ ROW_TYPE,
+ Collections.singletonList("pt"),
+ Collections.emptyList(),
+ conf.toMap(),
+ ""));
+ return new AppendOnlyFileStoreTable(tablePath.getName(),
schemaManager, schema, "user");
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index bed0cad7..7154062c 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -170,15 +170,16 @@ public class ChangelogValueCountFileStoreTableTest
extends FileStoreTableTestBas
conf.set(FileStoreOptions.PATH, tablePath.toString());
conf.set(FileStoreOptions.FILE_FORMAT, "avro");
conf.set(FileStoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ SchemaManager schemaManager = new SchemaManager(tablePath);
Schema schema =
- new SchemaManager(tablePath)
- .commitNewVersion(
- new UpdateSchema(
- ROW_TYPE,
- Collections.singletonList("pt"),
- Collections.emptyList(),
- conf.toMap(),
- ""));
- return new ChangelogValueCountFileStoreTable(tablePath.getName(),
schema, "user");
+ schemaManager.commitNewVersion(
+ new UpdateSchema(
+ ROW_TYPE,
+ Collections.singletonList("pt"),
+ Collections.emptyList(),
+ conf.toMap(),
+ ""));
+ return new ChangelogValueCountFileStoreTable(
+ tablePath.getName(), schemaManager, schema, "user");
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index e575651b..b5da3cd4 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -180,15 +180,16 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
conf.set(FileStoreOptions.PATH, tablePath.toString());
conf.set(FileStoreOptions.FILE_FORMAT, "avro");
conf.set(FileStoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ SchemaManager schemaManager = new SchemaManager(tablePath);
Schema schema =
- new SchemaManager(tablePath)
- .commitNewVersion(
- new UpdateSchema(
- ROW_TYPE,
- Collections.singletonList("pt"),
- Arrays.asList("pt", "a"),
- conf.toMap(),
- ""));
- return new ChangelogWithKeyFileStoreTable(tablePath.getName(), schema,
"user");
+ schemaManager.commitNewVersion(
+ new UpdateSchema(
+ ROW_TYPE,
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "a"),
+ conf.toMap(),
+ ""));
+ return new ChangelogWithKeyFileStoreTable(
+ tablePath.getName(), schemaManager, schema, "user");
}
}
diff --git
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
index a7ff2005..53a031c5 100644
---
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
+++
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
@@ -66,7 +66,7 @@ public class FileFormatSuffixTest extends DataFileTest {
format,
new Configuration());
AppendOnlyWriter appendOnlyWriter =
- new AppendOnlyWriter(fileFormat, 10, SCHEMA, 10,
dataFilePathFactory);
+ new AppendOnlyWriter(0, fileFormat, 10, SCHEMA, 10,
dataFilePathFactory);
appendOnlyWriter.write(
ValueKind.ADD,
BinaryRowDataUtil.EMPTY_ROW,