This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new abaa59f46 [core] Reduce schema read for readers (#3021)
abaa59f46 is described below
commit abaa59f460268bc35151170fd48381fb0ddc54ae
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 18 10:17:31 2024 +0800
[core] Reduce schema read for readers (#3021)
---
.../java/org/apache/paimon/AbstractFileStore.java | 7 +++---
.../org/apache/paimon/AppendOnlyFileStore.java | 11 ++++----
.../java/org/apache/paimon/KeyValueFileStore.java | 14 +++++------
.../paimon/io/KeyValueFileReaderFactory.java | 25 ++++++++++---------
.../paimon/operation/AbstractFileStoreScan.java | 6 ++++-
.../paimon/operation/AppendOnlyFileStoreRead.java | 15 ++++++-----
.../paimon/operation/AppendOnlyFileStoreScan.java | 6 +++--
.../paimon/operation/KeyValueFileStoreRead.java | 6 ++---
.../paimon/operation/KeyValueFileStoreScan.java | 10 +++++---
.../paimon/operation/KeyValueFileStoreWrite.java | 7 +++---
.../paimon/table/AppendOnlyFileStoreTable.java | 2 +-
.../paimon/table/PrimaryKeyFileStoreTable.java | 2 +-
.../test/java/org/apache/paimon/TestFileStore.java | 29 ++++++++++++++++++----
.../paimon/io/KeyValueFileReadWriteTest.java | 2 +-
.../paimon/mergetree/ContainsLevelsTest.java | 2 +-
.../apache/paimon/mergetree/LookupLevelsTest.java | 2 +-
.../apache/paimon/mergetree/MergeTreeTestBase.java | 2 +-
.../apache/paimon/operation/FileDeletionTest.java | 19 ++++++++------
.../paimon/operation/FileStoreCommitTest.java | 23 +++++++++--------
.../paimon/operation/FileStoreExpireTestBase.java | 3 ++-
.../operation/KeyValueFileStoreReadTest.java | 8 +++---
.../operation/KeyValueFileStoreScanTest.java | 3 ++-
.../flink/source/TestChangelogDataReadWrite.java | 11 ++++----
23 files changed, 129 insertions(+), 86 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 6413cd088..87cc4e65c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -34,6 +34,7 @@ import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFile;
import org.apache.paimon.stats.StatsFileHandler;
@@ -65,7 +66,7 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
protected final FileIO fileIO;
protected final SchemaManager schemaManager;
- protected final long schemaId;
+ protected final TableSchema schema;
protected final CoreOptions options;
protected final RowType partitionType;
private final CatalogEnvironment catalogEnvironment;
@@ -75,13 +76,13 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
public AbstractFileStore(
FileIO fileIO,
SchemaManager schemaManager,
- long schemaId,
+ TableSchema schema,
CoreOptions options,
RowType partitionType,
CatalogEnvironment catalogEnvironment) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
- this.schemaId = schemaId;
+ this.schema = schema;
this.options = options;
this.partitionType = partitionType;
this.catalogEnvironment = catalogEnvironment;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 8be8f8178..0d546e215 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -28,6 +28,7 @@ import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.operation.ScanBucketFilter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.types.RowType;
@@ -50,14 +51,14 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
public AppendOnlyFileStore(
FileIO fileIO,
SchemaManager schemaManager,
- long schemaId,
+ TableSchema schema,
CoreOptions options,
RowType partitionType,
RowType bucketKeyType,
RowType rowType,
String tableName,
CatalogEnvironment catalogEnvironment) {
- super(fileIO, schemaManager, schemaId, options, partitionType,
catalogEnvironment);
+ super(fileIO, schemaManager, schema, options, partitionType,
catalogEnvironment);
this.bucketKeyType = bucketKeyType;
this.rowType = rowType;
this.tableName = tableName;
@@ -82,7 +83,7 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
return new AppendOnlyFileStoreRead(
fileIO,
schemaManager,
- schemaId,
+ schema,
rowType,
FileFormatDiscover.of(options),
pathFactory());
@@ -99,7 +100,7 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
return new AppendOnlyFileStoreWrite(
fileIO,
newRead(),
- schemaId,
+ schema.id(),
commitUser,
rowType,
pathFactory(),
@@ -138,7 +139,7 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
bucketFilter,
snapshotManager(),
schemaManager,
- schemaId,
+ schema,
manifestFileFactory(forWrite),
manifestListFactory(forWrite),
options.bucket(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index a44c91ba5..d80bd6d39 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -35,6 +35,7 @@ import org.apache.paimon.operation.ScanBucketFilter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.types.RowType;
@@ -75,7 +76,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
public KeyValueFileStore(
FileIO fileIO,
SchemaManager schemaManager,
- long schemaId,
+ TableSchema schema,
boolean crossPartitionUpdate,
CoreOptions options,
RowType partitionType,
@@ -86,7 +87,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
MergeFunctionFactory<KeyValue> mfFactory,
String tableName,
CatalogEnvironment catalogEnvironment) {
- super(fileIO, schemaManager, schemaId, options, partitionType,
catalogEnvironment);
+ super(fileIO, schemaManager, schema, options, partitionType,
catalogEnvironment);
this.crossPartitionUpdate = crossPartitionUpdate;
this.bucketKeyType = bucketKeyType;
this.keyType = keyType;
@@ -121,8 +122,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
public KeyValueFileStoreRead newRead() {
return new KeyValueFileStoreRead(
options,
- schemaManager,
- schemaId,
+ schema,
keyType,
valueType,
newKeyComparator(),
@@ -134,7 +134,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
return KeyValueFileReaderFactory.builder(
fileIO,
schemaManager,
- schemaId,
+ schema,
keyType,
valueType,
FileFormatDiscover.of(options),
@@ -162,7 +162,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
return new KeyValueFileStoreWrite(
fileIO,
schemaManager,
- schemaId,
+ schema,
commitUser,
keyType,
valueType,
@@ -221,7 +221,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
bucketFilter,
snapshotManager(),
schemaManager,
- schemaId,
+ schema,
keyValueFieldsExtractor,
manifestFileFactory(forWrite),
manifestListFactory(forWrite),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 63fef31fc..cc7534e9a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -31,6 +31,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.AsyncRecordReader;
import org.apache.paimon.utils.BulkFormatMapping;
@@ -52,7 +53,7 @@ public class KeyValueFileReaderFactory {
private final FileIO fileIO;
private final SchemaManager schemaManager;
- private final long schemaId;
+ private final TableSchema schema;
private final RowType keyType;
private final RowType valueType;
@@ -67,7 +68,7 @@ public class KeyValueFileReaderFactory {
private KeyValueFileReaderFactory(
FileIO fileIO,
SchemaManager schemaManager,
- long schemaId,
+ TableSchema schema,
RowType keyType,
RowType valueType,
BulkFormatMapping.BulkFormatMappingBuilder
bulkFormatMappingBuilder,
@@ -77,7 +78,7 @@ public class KeyValueFileReaderFactory {
DeletionVector.Factory dvFactory) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
- this.schemaId = schemaId;
+ this.schema = schema;
this.keyType = keyType;
this.valueType = valueType;
this.bulkFormatMappingBuilder = bulkFormatMappingBuilder;
@@ -110,8 +111,8 @@ public class KeyValueFileReaderFactory {
() ->
bulkFormatMappingBuilder.build(
formatIdentifier,
- schemaManager.schema(this.schemaId),
- schemaManager.schema(schemaId));
+ schema,
+ schemaId == schema.id() ? schema :
schemaManager.schema(schemaId));
BulkFormatMapping bulkFormatMapping =
reuseFormat
@@ -141,7 +142,7 @@ public class KeyValueFileReaderFactory {
public static Builder builder(
FileIO fileIO,
SchemaManager schemaManager,
- long schemaId,
+ TableSchema schema,
RowType keyType,
RowType valueType,
FileFormatDiscover formatDiscover,
@@ -151,7 +152,7 @@ public class KeyValueFileReaderFactory {
return new Builder(
fileIO,
schemaManager,
- schemaId,
+ schema,
keyType,
valueType,
formatDiscover,
@@ -165,7 +166,7 @@ public class KeyValueFileReaderFactory {
private final FileIO fileIO;
private final SchemaManager schemaManager;
- private final long schemaId;
+ private final TableSchema schema;
private final RowType keyType;
private final RowType valueType;
private final FileFormatDiscover formatDiscover;
@@ -182,7 +183,7 @@ public class KeyValueFileReaderFactory {
private Builder(
FileIO fileIO,
SchemaManager schemaManager,
- long schemaId,
+ TableSchema schema,
RowType keyType,
RowType valueType,
FileFormatDiscover formatDiscover,
@@ -191,7 +192,7 @@ public class KeyValueFileReaderFactory {
CoreOptions options) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
- this.schemaId = schemaId;
+ this.schema = schema;
this.keyType = keyType;
this.valueType = valueType;
this.formatDiscover = formatDiscover;
@@ -209,7 +210,7 @@ public class KeyValueFileReaderFactory {
return new Builder(
fileIO,
schemaManager,
- schemaId,
+ schema,
keyType,
valueType,
formatDiscover,
@@ -255,7 +256,7 @@ public class KeyValueFileReaderFactory {
return new KeyValueFileReaderFactory(
fileIO,
schemaManager,
- schemaId,
+ schema,
projectedKeyType,
projectedValueType,
BulkFormatMapping.newBuilder(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index e6c95e885..52983f4b6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -72,6 +72,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private final ConcurrentMap<Long, TableSchema> tableSchemas;
private final SchemaManager schemaManager;
+ private final TableSchema schema;
protected final ScanBucketFilter bucketKeyFilter;
private final String branchName;
@@ -91,6 +92,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
ScanBucketFilter bucketKeyFilter,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
+ TableSchema schema,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
int numOfBuckets,
@@ -101,6 +103,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
this.bucketKeyFilter = bucketKeyFilter;
this.snapshotManager = snapshotManager;
this.schemaManager = schemaManager;
+ this.schema = schema;
this.manifestFileFactory = manifestFileFactory;
this.manifestList = manifestListFactory.create();
this.numOfBuckets = numOfBuckets;
@@ -407,7 +410,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
/** Note: Keep this thread-safe. */
protected TableSchema scanTableSchema(long id) {
- return tableSchemas.computeIfAbsent(id, key ->
schemaManager.schema(id));
+ return tableSchemas.computeIfAbsent(
+ id, key -> key == schema.id() ? schema :
schemaManager.schema(id));
}
/** Note: Keep this thread-safe. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
index c06cce458..8363c297a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
@@ -62,7 +62,7 @@ public class AppendOnlyFileStoreRead implements
FileStoreRead<InternalRow> {
private final FileIO fileIO;
private final SchemaManager schemaManager;
- private final long schemaId;
+ private final TableSchema schema;
private final FileFormatDiscover formatDiscover;
private final FileStorePathFactory pathFactory;
private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
@@ -74,13 +74,13 @@ public class AppendOnlyFileStoreRead implements
FileStoreRead<InternalRow> {
public AppendOnlyFileStoreRead(
FileIO fileIO,
SchemaManager schemaManager,
- long schemaId,
+ TableSchema schema,
RowType rowType,
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
- this.schemaId = schemaId;
+ this.schema = schema;
this.formatDiscover = formatDiscover;
this.pathFactory = pathFactory;
this.bulkFormatMappings = new HashMap<>();
@@ -113,8 +113,11 @@ public class AppendOnlyFileStoreRead implements
FileStoreRead<InternalRow> {
bulkFormatMappings.computeIfAbsent(
new FormatKey(file.schemaId(), formatIdentifier),
key -> {
- TableSchema tableSchema =
schemaManager.schema(this.schemaId);
- TableSchema dataSchema =
schemaManager.schema(key.schemaId);
+ TableSchema tableSchema = schema;
+ TableSchema dataSchema =
+ key.schemaId == schema.id()
+ ? schema
+ :
schemaManager.schema(key.schemaId);
// projection to data schema
int[][] dataProjection =
@@ -131,7 +134,7 @@ public class AppendOnlyFileStoreRead implements
FileStoreRead<InternalRow> {
dataSchema.fields());
List<Predicate> dataFilters =
- this.schemaId == key.schemaId
+ this.schema.id() == key.schemaId
? filters
:
SchemaEvolutionUtil.createDataFilters(
tableSchema.fields(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 2cec4e064..866c87d75 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -24,6 +24,7 @@ import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.stats.FieldStatsConverters;
@@ -44,7 +45,7 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
ScanBucketFilter bucketFilter,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
- long schemaId,
+ TableSchema schema,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
int numOfBuckets,
@@ -56,6 +57,7 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
bucketFilter,
snapshotManager,
schemaManager,
+ schema,
manifestFileFactory,
manifestListFactory,
numOfBuckets,
@@ -63,7 +65,7 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
scanManifestParallelism,
branchName);
this.fieldStatsConverters =
- new FieldStatsConverters(sid -> scanTableSchema(sid).fields(),
schemaId);
+ new FieldStatsConverters(sid -> scanTableSchema(sid).fields(),
schema.id());
}
public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index db4c0ac92..0e115fddd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -41,7 +41,6 @@ import
org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
@@ -89,14 +88,13 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
public KeyValueFileStoreRead(
CoreOptions options,
- SchemaManager schemaManager,
- long schemaId,
+ TableSchema schema,
RowType keyType,
RowType valueType,
Comparator<InternalRow> keyComparator,
MergeFunctionFactory<KeyValue> mfFactory,
KeyValueFileReaderFactory.Builder readerFactoryBuilder) {
- this.tableSchema = schemaManager.schema(schemaId);
+ this.tableSchema = schema;
this.readerFactoryBuilder = readerFactoryBuilder;
this.fileIO = readerFactoryBuilder.fileIO();
this.keyComparator = keyComparator;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 8e37d1e4f..0f34cac5a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -25,6 +25,7 @@ import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.stats.FieldStatsConverters;
@@ -50,7 +51,7 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
ScanBucketFilter bucketFilter,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
- long schemaId,
+ TableSchema schema,
KeyValueFieldsExtractor keyValueFieldsExtractor,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
@@ -64,6 +65,7 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
bucketFilter,
snapshotManager,
schemaManager,
+ schema,
manifestFileFactory,
manifestListFactory,
numOfBuckets,
@@ -72,10 +74,12 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
branchName);
this.fieldKeyStatsConverters =
new FieldStatsConverters(
- sid ->
keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), schemaId);
+ sid ->
keyValueFieldsExtractor.keyFields(scanTableSchema(sid)),
+ schema.id());
this.fieldValueStatsConverters =
new FieldStatsConverters(
- sid ->
keyValueFieldsExtractor.valueFields(scanTableSchema(sid)), schemaId);
+ sid ->
keyValueFieldsExtractor.valueFields(scanTableSchema(sid)),
+ schema.id());
this.deletionVectorsEnabled = deletionVectorsEnabled;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 41b77d612..e06ed9ea2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -60,6 +60,7 @@ import
org.apache.paimon.mergetree.compact.UniversalCompaction;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FieldsComparator;
@@ -102,7 +103,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
public KeyValueFileStoreWrite(
FileIO fileIO,
SchemaManager schemaManager,
- long schemaId,
+ TableSchema schema,
String commitUser,
RowType keyType,
RowType valueType,
@@ -135,7 +136,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
KeyValueFileReaderFactory.builder(
fileIO,
schemaManager,
- schemaId,
+ schema,
keyType,
valueType,
FileFormatDiscover.of(options),
@@ -145,7 +146,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
this.writerFactoryBuilder =
KeyValueFileWriterFactory.builder(
fileIO,
- schemaId,
+ schema.id(),
keyType,
valueType,
options.fileFormat(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 9c97d406c..4d91328f2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -77,7 +77,7 @@ class AppendOnlyFileStoreTable extends AbstractFileStoreTable
{
new AppendOnlyFileStore(
fileIO,
schemaManager(),
- tableSchema.id(),
+ tableSchema,
new CoreOptions(tableSchema.options()),
tableSchema.logicalPartitionType(),
tableSchema.logicalBucketKeyType(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 7b30fb832..f35afc64d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -98,7 +98,7 @@ class PrimaryKeyFileStoreTable extends AbstractFileStoreTable
{
new KeyValueFileStore(
fileIO(),
schemaManager(),
- tableSchema.id(),
+ tableSchema,
tableSchema.crossPartitionUpdate(),
options,
tableSchema.logicalPartitionType(),
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index d9cc7f878..43e0297fd 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -44,6 +44,7 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.table.ExpireSnapshotsImpl;
@@ -103,11 +104,21 @@ public class TestFileStore extends KeyValueFileStore {
RowType keyType,
RowType valueType,
KeyValueFieldsExtractor keyValueFieldsExtractor,
- MergeFunctionFactory<KeyValue> mfFactory) {
+ MergeFunctionFactory<KeyValue> mfFactory,
+ TableSchema tableSchema) {
super(
FileIOFinder.find(new Path(root)),
- new SchemaManager(FileIOFinder.find(new Path(root)),
options.path()),
- 0L,
+ schemaManager(root, options),
+ tableSchema != null
+ ? tableSchema
+ : new TableSchema(
+ 0L,
+ valueType.getFields(),
+ valueType.getFieldCount(),
+ partitionType.getFieldNames(),
+ keyType.getFieldNames(),
+ Collections.emptyMap(),
+ null),
false,
options,
partitionType,
@@ -127,6 +138,10 @@ public class TestFileStore extends KeyValueFileStore {
this.commitIdentifier = 0L;
}
+ private static SchemaManager schemaManager(String root, CoreOptions
options) {
+ return new SchemaManager(FileIOFinder.find(new Path(root)),
options.path());
+ }
+
public AbstractFileStoreWrite<KeyValue> newWrite() {
return super.newWrite(commitUser);
}
@@ -563,6 +578,7 @@ public class TestFileStore extends KeyValueFileStore {
private final RowType valueType;
private final KeyValueFieldsExtractor keyValueFieldsExtractor;
private final MergeFunctionFactory<KeyValue> mfFactory;
+ private final TableSchema tableSchema;
private CoreOptions.ChangelogProducer changelogProducer;
@@ -574,7 +590,8 @@ public class TestFileStore extends KeyValueFileStore {
RowType keyType,
RowType valueType,
KeyValueFieldsExtractor keyValueFieldsExtractor,
- MergeFunctionFactory<KeyValue> mfFactory) {
+ MergeFunctionFactory<KeyValue> mfFactory,
+ TableSchema tableSchema) {
this.format = format;
this.root = root;
this.numBuckets = numBuckets;
@@ -583,6 +600,7 @@ public class TestFileStore extends KeyValueFileStore {
this.valueType = valueType;
this.keyValueFieldsExtractor = keyValueFieldsExtractor;
this.mfFactory = mfFactory;
+ this.tableSchema = tableSchema;
this.changelogProducer = CoreOptions.ChangelogProducer.NONE;
}
@@ -620,7 +638,8 @@ public class TestFileStore extends KeyValueFileStore {
keyType,
valueType,
keyValueFieldsExtractor,
- mfFactory);
+ mfFactory,
+ tableSchema);
}
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 05f260097..ca7f75d6e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -282,7 +282,7 @@ public class KeyValueFileReadWriteTest {
KeyValueFileReaderFactory.builder(
fileIO,
createTestSchemaManager(path),
- 0,
+ createTestSchemaManager(path).schema(0),
KEY_TYPE,
DEFAULT_ROW_TYPE,
ignore -> new FlushingFileFormat(format),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 58d9dbe90..d53b94155 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -235,7 +235,7 @@ public class ContainsLevelsTest {
KeyValueFileReaderFactory.builder(
FileIOFinder.find(path),
createSchemaManager(path),
- 0,
+ createSchemaManager(path).schema(0),
keyType,
rowType,
ignore -> new FlushingFileFormat("avro"),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 00d8eeb5a..f4d27784a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -315,7 +315,7 @@ public class LookupLevelsTest {
KeyValueFileReaderFactory.builder(
FileIOFinder.find(path),
createSchemaManager(path),
- 0,
+ createSchemaManager(path).schema(0),
keyType,
rowType,
ignore -> new FlushingFileFormat("avro"),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index f6ddf74ea..a3bcc9bfa 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -147,7 +147,7 @@ public abstract class MergeTreeTestBase {
KeyValueFileReaderFactory.builder(
LocalFileIO.create(),
createTestingSchemaManager(path),
- 0,
+ createTestingSchemaManager(path).schema(0),
keyType,
valueType,
ignore -> flushingAvro,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 49e55ee4e..9994c0809 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -36,6 +36,7 @@ import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordWriter;
@@ -664,13 +665,14 @@ public class FileDeletionTest {
}
SchemaManager schemaManager = new SchemaManager(fileIO, new
Path(root));
- schemaManager.createTable(
- new Schema(
- rowType.getFields(),
- partitionType.getFieldNames(),
- TestKeyValueGenerator.getPrimaryKeys(mode),
- Collections.emptyMap(),
- null));
+ TableSchema tableSchema =
+ schemaManager.createTable(
+ new Schema(
+ rowType.getFields(),
+ partitionType.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(mode),
+ Collections.emptyMap(),
+ null));
return new TestFileStore.Builder(
"avro",
@@ -680,7 +682,8 @@ public class FileDeletionTest {
TestKeyValueGenerator.KEY_TYPE,
rowType,
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
- DeduplicateMergeFunction.factory())
+ DeduplicateMergeFunction.factory(),
+ tableSchema)
.changelogProducer(changelogProducer)
.build();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 488f74f67..b0cec3f44 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -35,6 +35,7 @@ import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.ColStats;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.stats.StatsFileHandler;
@@ -856,15 +857,16 @@ public class FileStoreCommitTest {
? FailingFileIO.getFailingPath(failingName,
tempDir.toString())
: TraceableFileIO.SCHEME + "://" + tempDir.toString();
Path path = new Path(tempDir.toUri());
- SchemaUtils.forceCommit(
- new SchemaManager(new LocalFileIO(), path),
- new Schema(
- TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
-
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
- TestKeyValueGenerator.getPrimaryKeys(
-
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
- Collections.emptyMap(),
- null));
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(new LocalFileIO(), path),
+ new Schema(
+
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(
+
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+ Collections.emptyMap(),
+ null));
return new TestFileStore.Builder(
"avro",
root,
@@ -873,7 +875,8 @@ public class FileStoreCommitTest {
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
- DeduplicateMergeFunction.factory())
+ DeduplicateMergeFunction.factory(),
+ tableSchema)
.changelogProducer(changelogProducer)
.build();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
index d9b3469f9..5ebe76ed6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
@@ -86,7 +86,8 @@ public abstract class FileStoreExpireTestBase {
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
- DeduplicateMergeFunction.factory())
+ DeduplicateMergeFunction.factory(),
+ null)
.changelogProducer(changelogProducer)
.build();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
index a3c0142e7..07bf705e2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
@@ -268,7 +268,7 @@ public class KeyValueFileStoreReadTest {
Path path = new Path(tempDir.toUri());
SchemaManager schemaManager = new
SchemaManager(FileIOFinder.find(path), path);
boolean valueCountMode = mfFactory.create() instanceof
TestValueCountMergeFunction;
- schemaManager.createTable(
+ Schema schema =
new Schema(
(valueCountMode ? keyType : valueType).getFields(),
partitionType.getFieldNames(),
@@ -280,7 +280,8 @@ public class KeyValueFileStoreReadTest {
partitionType.getFieldNames().stream())
.collect(Collectors.toList()),
Collections.emptyMap(),
- null));
+ null);
+ TableSchema tableSchema = schemaManager.createTable(schema);
return new TestFileStore.Builder(
"avro",
tempDir.toString(),
@@ -289,7 +290,8 @@ public class KeyValueFileStoreReadTest {
keyType,
valueType,
extractor,
- mfFactory)
+ mfFactory,
+ tableSchema)
.build();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
index 74dee018e..7f1fc0d3c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
@@ -76,7 +76,8 @@ public class KeyValueFileStoreScanTest {
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
- DeduplicateMergeFunction.factory())
+ DeduplicateMergeFunction.factory(),
+ null)
.build();
snapshotManager = store.snapshotManager();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 7ffa94809..d51aefb96 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -119,13 +119,11 @@ public class TestChangelogDataReadWrite {
RecordReader.RecordIterator<InternalRow>>
rowDataIteratorCreator) {
SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
tablePath);
- long schemaId = 0;
CoreOptions options = new CoreOptions(new HashMap<>());
KeyValueFileStoreRead read =
new KeyValueFileStoreRead(
options,
- schemaManager,
- schemaId,
+ schemaManager.schema(0),
KEY_TYPE,
VALUE_TYPE,
COMPARATOR,
@@ -133,7 +131,7 @@ public class TestChangelogDataReadWrite {
KeyValueFileReaderFactory.builder(
LocalFileIO.create(),
schemaManager,
- schemaId,
+ schemaManager.schema(0),
KEY_TYPE,
VALUE_TYPE,
ignore -> avro,
@@ -177,11 +175,12 @@ public class TestChangelogDataReadWrite {
Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
pathFactoryMap.put("avro", pathFactory);
+ SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
tablePath);
RecordWriter<KeyValue> writer =
new KeyValueFileStoreWrite(
LocalFileIO.create(),
- new SchemaManager(LocalFileIO.create(),
tablePath),
- 0,
+ schemaManager,
+ schemaManager.schema(0),
commitUser,
KEY_TYPE,
VALUE_TYPE,