This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new abaa59f46 [core] Reduce schema read for readers (#3021)
abaa59f46 is described below

commit abaa59f460268bc35151170fd48381fb0ddc54ae
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 18 10:17:31 2024 +0800

    [core] Reduce schema read for readers (#3021)
---
 .../java/org/apache/paimon/AbstractFileStore.java  |  7 +++---
 .../org/apache/paimon/AppendOnlyFileStore.java     | 11 ++++----
 .../java/org/apache/paimon/KeyValueFileStore.java  | 14 +++++------
 .../paimon/io/KeyValueFileReaderFactory.java       | 25 ++++++++++---------
 .../paimon/operation/AbstractFileStoreScan.java    |  6 ++++-
 .../paimon/operation/AppendOnlyFileStoreRead.java  | 15 ++++++-----
 .../paimon/operation/AppendOnlyFileStoreScan.java  |  6 +++--
 .../paimon/operation/KeyValueFileStoreRead.java    |  6 ++---
 .../paimon/operation/KeyValueFileStoreScan.java    | 10 +++++---
 .../paimon/operation/KeyValueFileStoreWrite.java   |  7 +++---
 .../paimon/table/AppendOnlyFileStoreTable.java     |  2 +-
 .../paimon/table/PrimaryKeyFileStoreTable.java     |  2 +-
 .../test/java/org/apache/paimon/TestFileStore.java | 29 ++++++++++++++++++----
 .../paimon/io/KeyValueFileReadWriteTest.java       |  2 +-
 .../paimon/mergetree/ContainsLevelsTest.java       |  2 +-
 .../apache/paimon/mergetree/LookupLevelsTest.java  |  2 +-
 .../apache/paimon/mergetree/MergeTreeTestBase.java |  2 +-
 .../apache/paimon/operation/FileDeletionTest.java  | 19 ++++++++------
 .../paimon/operation/FileStoreCommitTest.java      | 23 +++++++++--------
 .../paimon/operation/FileStoreExpireTestBase.java  |  3 ++-
 .../operation/KeyValueFileStoreReadTest.java       |  8 +++---
 .../operation/KeyValueFileStoreScanTest.java       |  3 ++-
 .../flink/source/TestChangelogDataReadWrite.java   | 11 ++++----
 23 files changed, 129 insertions(+), 86 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 6413cd088..87cc4e65c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -34,6 +34,7 @@ import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.service.ServiceManager;
 import org.apache.paimon.stats.StatsFile;
 import org.apache.paimon.stats.StatsFileHandler;
@@ -65,7 +66,7 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
 
     protected final FileIO fileIO;
     protected final SchemaManager schemaManager;
-    protected final long schemaId;
+    protected final TableSchema schema;
     protected final CoreOptions options;
     protected final RowType partitionType;
     private final CatalogEnvironment catalogEnvironment;
@@ -75,13 +76,13 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
     public AbstractFileStore(
             FileIO fileIO,
             SchemaManager schemaManager,
-            long schemaId,
+            TableSchema schema,
             CoreOptions options,
             RowType partitionType,
             CatalogEnvironment catalogEnvironment) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
-        this.schemaId = schemaId;
+        this.schema = schema;
         this.options = options;
         this.partitionType = partitionType;
         this.catalogEnvironment = catalogEnvironment;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 8be8f8178..0d546e215 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -28,6 +28,7 @@ import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
 import org.apache.paimon.operation.ScanBucketFilter;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.types.RowType;
@@ -50,14 +51,14 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
     public AppendOnlyFileStore(
             FileIO fileIO,
             SchemaManager schemaManager,
-            long schemaId,
+            TableSchema schema,
             CoreOptions options,
             RowType partitionType,
             RowType bucketKeyType,
             RowType rowType,
             String tableName,
             CatalogEnvironment catalogEnvironment) {
-        super(fileIO, schemaManager, schemaId, options, partitionType, 
catalogEnvironment);
+        super(fileIO, schemaManager, schema, options, partitionType, 
catalogEnvironment);
         this.bucketKeyType = bucketKeyType;
         this.rowType = rowType;
         this.tableName = tableName;
@@ -82,7 +83,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
         return new AppendOnlyFileStoreRead(
                 fileIO,
                 schemaManager,
-                schemaId,
+                schema,
                 rowType,
                 FileFormatDiscover.of(options),
                 pathFactory());
@@ -99,7 +100,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
         return new AppendOnlyFileStoreWrite(
                 fileIO,
                 newRead(),
-                schemaId,
+                schema.id(),
                 commitUser,
                 rowType,
                 pathFactory(),
@@ -138,7 +139,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 bucketFilter,
                 snapshotManager(),
                 schemaManager,
-                schemaId,
+                schema,
                 manifestFileFactory(forWrite),
                 manifestListFactory(forWrite),
                 options.bucket(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index a44c91ba5..d80bd6d39 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -35,6 +35,7 @@ import org.apache.paimon.operation.ScanBucketFilter;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.types.RowType;
@@ -75,7 +76,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
     public KeyValueFileStore(
             FileIO fileIO,
             SchemaManager schemaManager,
-            long schemaId,
+            TableSchema schema,
             boolean crossPartitionUpdate,
             CoreOptions options,
             RowType partitionType,
@@ -86,7 +87,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
             MergeFunctionFactory<KeyValue> mfFactory,
             String tableName,
             CatalogEnvironment catalogEnvironment) {
-        super(fileIO, schemaManager, schemaId, options, partitionType, 
catalogEnvironment);
+        super(fileIO, schemaManager, schema, options, partitionType, 
catalogEnvironment);
         this.crossPartitionUpdate = crossPartitionUpdate;
         this.bucketKeyType = bucketKeyType;
         this.keyType = keyType;
@@ -121,8 +122,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
     public KeyValueFileStoreRead newRead() {
         return new KeyValueFileStoreRead(
                 options,
-                schemaManager,
-                schemaId,
+                schema,
                 keyType,
                 valueType,
                 newKeyComparator(),
@@ -134,7 +134,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
         return KeyValueFileReaderFactory.builder(
                 fileIO,
                 schemaManager,
-                schemaId,
+                schema,
                 keyType,
                 valueType,
                 FileFormatDiscover.of(options),
@@ -162,7 +162,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
         return new KeyValueFileStoreWrite(
                 fileIO,
                 schemaManager,
-                schemaId,
+                schema,
                 commitUser,
                 keyType,
                 valueType,
@@ -221,7 +221,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 bucketFilter,
                 snapshotManager(),
                 schemaManager,
-                schemaId,
+                schema,
                 keyValueFieldsExtractor,
                 manifestFileFactory(forWrite),
                 manifestListFactory(forWrite),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 63fef31fc..cc7534e9a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -31,6 +31,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.AsyncRecordReader;
 import org.apache.paimon.utils.BulkFormatMapping;
@@ -52,7 +53,7 @@ public class KeyValueFileReaderFactory {
 
     private final FileIO fileIO;
     private final SchemaManager schemaManager;
-    private final long schemaId;
+    private final TableSchema schema;
     private final RowType keyType;
     private final RowType valueType;
 
@@ -67,7 +68,7 @@ public class KeyValueFileReaderFactory {
     private KeyValueFileReaderFactory(
             FileIO fileIO,
             SchemaManager schemaManager,
-            long schemaId,
+            TableSchema schema,
             RowType keyType,
             RowType valueType,
             BulkFormatMapping.BulkFormatMappingBuilder 
bulkFormatMappingBuilder,
@@ -77,7 +78,7 @@ public class KeyValueFileReaderFactory {
             DeletionVector.Factory dvFactory) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
-        this.schemaId = schemaId;
+        this.schema = schema;
         this.keyType = keyType;
         this.valueType = valueType;
         this.bulkFormatMappingBuilder = bulkFormatMappingBuilder;
@@ -110,8 +111,8 @@ public class KeyValueFileReaderFactory {
                 () ->
                         bulkFormatMappingBuilder.build(
                                 formatIdentifier,
-                                schemaManager.schema(this.schemaId),
-                                schemaManager.schema(schemaId));
+                                schema,
+                                schemaId == schema.id() ? schema : 
schemaManager.schema(schemaId));
 
         BulkFormatMapping bulkFormatMapping =
                 reuseFormat
@@ -141,7 +142,7 @@ public class KeyValueFileReaderFactory {
     public static Builder builder(
             FileIO fileIO,
             SchemaManager schemaManager,
-            long schemaId,
+            TableSchema schema,
             RowType keyType,
             RowType valueType,
             FileFormatDiscover formatDiscover,
@@ -151,7 +152,7 @@ public class KeyValueFileReaderFactory {
         return new Builder(
                 fileIO,
                 schemaManager,
-                schemaId,
+                schema,
                 keyType,
                 valueType,
                 formatDiscover,
@@ -165,7 +166,7 @@ public class KeyValueFileReaderFactory {
 
         private final FileIO fileIO;
         private final SchemaManager schemaManager;
-        private final long schemaId;
+        private final TableSchema schema;
         private final RowType keyType;
         private final RowType valueType;
         private final FileFormatDiscover formatDiscover;
@@ -182,7 +183,7 @@ public class KeyValueFileReaderFactory {
         private Builder(
                 FileIO fileIO,
                 SchemaManager schemaManager,
-                long schemaId,
+                TableSchema schema,
                 RowType keyType,
                 RowType valueType,
                 FileFormatDiscover formatDiscover,
@@ -191,7 +192,7 @@ public class KeyValueFileReaderFactory {
                 CoreOptions options) {
             this.fileIO = fileIO;
             this.schemaManager = schemaManager;
-            this.schemaId = schemaId;
+            this.schema = schema;
             this.keyType = keyType;
             this.valueType = valueType;
             this.formatDiscover = formatDiscover;
@@ -209,7 +210,7 @@ public class KeyValueFileReaderFactory {
             return new Builder(
                     fileIO,
                     schemaManager,
-                    schemaId,
+                    schema,
                     keyType,
                     valueType,
                     formatDiscover,
@@ -255,7 +256,7 @@ public class KeyValueFileReaderFactory {
             return new KeyValueFileReaderFactory(
                     fileIO,
                     schemaManager,
-                    schemaId,
+                    schema,
                     projectedKeyType,
                     projectedValueType,
                     BulkFormatMapping.newBuilder(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index e6c95e885..52983f4b6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -72,6 +72,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     private final ConcurrentMap<Long, TableSchema> tableSchemas;
     private final SchemaManager schemaManager;
+    private final TableSchema schema;
     protected final ScanBucketFilter bucketKeyFilter;
     private final String branchName;
 
@@ -91,6 +92,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             ScanBucketFilter bucketKeyFilter,
             SnapshotManager snapshotManager,
             SchemaManager schemaManager,
+            TableSchema schema,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
@@ -101,6 +103,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         this.bucketKeyFilter = bucketKeyFilter;
         this.snapshotManager = snapshotManager;
         this.schemaManager = schemaManager;
+        this.schema = schema;
         this.manifestFileFactory = manifestFileFactory;
         this.manifestList = manifestListFactory.create();
         this.numOfBuckets = numOfBuckets;
@@ -407,7 +410,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     /** Note: Keep this thread-safe. */
     protected TableSchema scanTableSchema(long id) {
-        return tableSchemas.computeIfAbsent(id, key -> 
schemaManager.schema(id));
+        return tableSchemas.computeIfAbsent(
+                id, key -> key == schema.id() ? schema : 
schemaManager.schema(id));
     }
 
     /** Note: Keep this thread-safe. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
index c06cce458..8363c297a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
@@ -62,7 +62,7 @@ public class AppendOnlyFileStoreRead implements 
FileStoreRead<InternalRow> {
 
     private final FileIO fileIO;
     private final SchemaManager schemaManager;
-    private final long schemaId;
+    private final TableSchema schema;
     private final FileFormatDiscover formatDiscover;
     private final FileStorePathFactory pathFactory;
     private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
@@ -74,13 +74,13 @@ public class AppendOnlyFileStoreRead implements 
FileStoreRead<InternalRow> {
     public AppendOnlyFileStoreRead(
             FileIO fileIO,
             SchemaManager schemaManager,
-            long schemaId,
+            TableSchema schema,
             RowType rowType,
             FileFormatDiscover formatDiscover,
             FileStorePathFactory pathFactory) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
-        this.schemaId = schemaId;
+        this.schema = schema;
         this.formatDiscover = formatDiscover;
         this.pathFactory = pathFactory;
         this.bulkFormatMappings = new HashMap<>();
@@ -113,8 +113,11 @@ public class AppendOnlyFileStoreRead implements 
FileStoreRead<InternalRow> {
                     bulkFormatMappings.computeIfAbsent(
                             new FormatKey(file.schemaId(), formatIdentifier),
                             key -> {
-                                TableSchema tableSchema = 
schemaManager.schema(this.schemaId);
-                                TableSchema dataSchema = 
schemaManager.schema(key.schemaId);
+                                TableSchema tableSchema = schema;
+                                TableSchema dataSchema =
+                                        key.schemaId == schema.id()
+                                                ? schema
+                                                : 
schemaManager.schema(key.schemaId);
 
                                 // projection to data schema
                                 int[][] dataProjection =
@@ -131,7 +134,7 @@ public class AppendOnlyFileStoreRead implements 
FileStoreRead<InternalRow> {
                                                 dataSchema.fields());
 
                                 List<Predicate> dataFilters =
-                                        this.schemaId == key.schemaId
+                                        this.schema.id() == key.schemaId
                                                 ? filters
                                                 : 
SchemaEvolutionUtil.createDataFilters(
                                                         tableSchema.fields(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 2cec4e064..866c87d75 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -24,6 +24,7 @@ import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.BinaryTableStats;
 import org.apache.paimon.stats.FieldStatsArraySerializer;
 import org.apache.paimon.stats.FieldStatsConverters;
@@ -44,7 +45,7 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
             ScanBucketFilter bucketFilter,
             SnapshotManager snapshotManager,
             SchemaManager schemaManager,
-            long schemaId,
+            TableSchema schema,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
@@ -56,6 +57,7 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
                 bucketFilter,
                 snapshotManager,
                 schemaManager,
+                schema,
                 manifestFileFactory,
                 manifestListFactory,
                 numOfBuckets,
@@ -63,7 +65,7 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
                 scanManifestParallelism,
                 branchName);
         this.fieldStatsConverters =
-                new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), 
schemaId);
+                new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), 
schema.id());
     }
 
     public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index db4c0ac92..0e115fddd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -41,7 +41,6 @@ import 
org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
 import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.DeletionFile;
@@ -89,14 +88,13 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
 
     public KeyValueFileStoreRead(
             CoreOptions options,
-            SchemaManager schemaManager,
-            long schemaId,
+            TableSchema schema,
             RowType keyType,
             RowType valueType,
             Comparator<InternalRow> keyComparator,
             MergeFunctionFactory<KeyValue> mfFactory,
             KeyValueFileReaderFactory.Builder readerFactoryBuilder) {
-        this.tableSchema = schemaManager.schema(schemaId);
+        this.tableSchema = schema;
         this.readerFactoryBuilder = readerFactoryBuilder;
         this.fileIO = readerFactoryBuilder.fileIO();
         this.keyComparator = keyComparator;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 8e37d1e4f..0f34cac5a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -25,6 +25,7 @@ import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.BinaryTableStats;
 import org.apache.paimon.stats.FieldStatsArraySerializer;
 import org.apache.paimon.stats.FieldStatsConverters;
@@ -50,7 +51,7 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
             ScanBucketFilter bucketFilter,
             SnapshotManager snapshotManager,
             SchemaManager schemaManager,
-            long schemaId,
+            TableSchema schema,
             KeyValueFieldsExtractor keyValueFieldsExtractor,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
@@ -64,6 +65,7 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                 bucketFilter,
                 snapshotManager,
                 schemaManager,
+                schema,
                 manifestFileFactory,
                 manifestListFactory,
                 numOfBuckets,
@@ -72,10 +74,12 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                 branchName);
         this.fieldKeyStatsConverters =
                 new FieldStatsConverters(
-                        sid -> 
keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), schemaId);
+                        sid -> 
keyValueFieldsExtractor.keyFields(scanTableSchema(sid)),
+                        schema.id());
         this.fieldValueStatsConverters =
                 new FieldStatsConverters(
-                        sid -> 
keyValueFieldsExtractor.valueFields(scanTableSchema(sid)), schemaId);
+                        sid -> 
keyValueFieldsExtractor.valueFields(scanTableSchema(sid)),
+                        schema.id());
         this.deletionVectorsEnabled = deletionVectorsEnabled;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 41b77d612..e06ed9ea2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -60,6 +60,7 @@ import 
org.apache.paimon.mergetree.compact.UniversalCompaction;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.FieldsComparator;
@@ -102,7 +103,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
     public KeyValueFileStoreWrite(
             FileIO fileIO,
             SchemaManager schemaManager,
-            long schemaId,
+            TableSchema schema,
             String commitUser,
             RowType keyType,
             RowType valueType,
@@ -135,7 +136,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 KeyValueFileReaderFactory.builder(
                         fileIO,
                         schemaManager,
-                        schemaId,
+                        schema,
                         keyType,
                         valueType,
                         FileFormatDiscover.of(options),
@@ -145,7 +146,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
         this.writerFactoryBuilder =
                 KeyValueFileWriterFactory.builder(
                         fileIO,
-                        schemaId,
+                        schema.id(),
                         keyType,
                         valueType,
                         options.fileFormat(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 9c97d406c..4d91328f2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -77,7 +77,7 @@ class AppendOnlyFileStoreTable extends AbstractFileStoreTable 
{
                     new AppendOnlyFileStore(
                             fileIO,
                             schemaManager(),
-                            tableSchema.id(),
+                            tableSchema,
                             new CoreOptions(tableSchema.options()),
                             tableSchema.logicalPartitionType(),
                             tableSchema.logicalBucketKeyType(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 7b30fb832..f35afc64d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -98,7 +98,7 @@ class PrimaryKeyFileStoreTable extends AbstractFileStoreTable 
{
                     new KeyValueFileStore(
                             fileIO(),
                             schemaManager(),
-                            tableSchema.id(),
+                            tableSchema,
                             tableSchema.crossPartitionUpdate(),
                             options,
                             tableSchema.logicalPartitionType(),
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index d9cc7f878..43e0297fd 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -44,6 +44,7 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.ExpireSnapshots;
 import org.apache.paimon.table.ExpireSnapshotsImpl;
@@ -103,11 +104,21 @@ public class TestFileStore extends KeyValueFileStore {
             RowType keyType,
             RowType valueType,
             KeyValueFieldsExtractor keyValueFieldsExtractor,
-            MergeFunctionFactory<KeyValue> mfFactory) {
+            MergeFunctionFactory<KeyValue> mfFactory,
+            TableSchema tableSchema) {
         super(
                 FileIOFinder.find(new Path(root)),
-                new SchemaManager(FileIOFinder.find(new Path(root)), 
options.path()),
-                0L,
+                schemaManager(root, options),
+                tableSchema != null
+                        ? tableSchema
+                        : new TableSchema(
+                                0L,
+                                valueType.getFields(),
+                                valueType.getFieldCount(),
+                                partitionType.getFieldNames(),
+                                keyType.getFieldNames(),
+                                Collections.emptyMap(),
+                                null),
                 false,
                 options,
                 partitionType,
@@ -127,6 +138,10 @@ public class TestFileStore extends KeyValueFileStore {
         this.commitIdentifier = 0L;
     }
 
+    private static SchemaManager schemaManager(String root, CoreOptions 
options) {
+        return new SchemaManager(FileIOFinder.find(new Path(root)), 
options.path());
+    }
+
     public AbstractFileStoreWrite<KeyValue> newWrite() {
         return super.newWrite(commitUser);
     }
@@ -563,6 +578,7 @@ public class TestFileStore extends KeyValueFileStore {
         private final RowType valueType;
         private final KeyValueFieldsExtractor keyValueFieldsExtractor;
         private final MergeFunctionFactory<KeyValue> mfFactory;
+        private final TableSchema tableSchema;
 
         private CoreOptions.ChangelogProducer changelogProducer;
 
@@ -574,7 +590,8 @@ public class TestFileStore extends KeyValueFileStore {
                 RowType keyType,
                 RowType valueType,
                 KeyValueFieldsExtractor keyValueFieldsExtractor,
-                MergeFunctionFactory<KeyValue> mfFactory) {
+                MergeFunctionFactory<KeyValue> mfFactory,
+                TableSchema tableSchema) {
             this.format = format;
             this.root = root;
             this.numBuckets = numBuckets;
@@ -583,6 +600,7 @@ public class TestFileStore extends KeyValueFileStore {
             this.valueType = valueType;
             this.keyValueFieldsExtractor = keyValueFieldsExtractor;
             this.mfFactory = mfFactory;
+            this.tableSchema = tableSchema;
 
             this.changelogProducer = CoreOptions.ChangelogProducer.NONE;
         }
@@ -620,7 +638,8 @@ public class TestFileStore extends KeyValueFileStore {
                     keyType,
                     valueType,
                     keyValueFieldsExtractor,
-                    mfFactory);
+                    mfFactory,
+                    tableSchema);
         }
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 05f260097..ca7f75d6e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -282,7 +282,7 @@ public class KeyValueFileReadWriteTest {
                 KeyValueFileReaderFactory.builder(
                         fileIO,
                         createTestSchemaManager(path),
-                        0,
+                        createTestSchemaManager(path).schema(0),
                         KEY_TYPE,
                         DEFAULT_ROW_TYPE,
                         ignore -> new FlushingFileFormat(format),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 58d9dbe90..d53b94155 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -235,7 +235,7 @@ public class ContainsLevelsTest {
                 KeyValueFileReaderFactory.builder(
                         FileIOFinder.find(path),
                         createSchemaManager(path),
-                        0,
+                        createSchemaManager(path).schema(0),
                         keyType,
                         rowType,
                         ignore -> new FlushingFileFormat("avro"),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 00d8eeb5a..f4d27784a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -315,7 +315,7 @@ public class LookupLevelsTest {
                 KeyValueFileReaderFactory.builder(
                         FileIOFinder.find(path),
                         createSchemaManager(path),
-                        0,
+                        createSchemaManager(path).schema(0),
                         keyType,
                         rowType,
                         ignore -> new FlushingFileFormat("avro"),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index f6ddf74ea..a3bcc9bfa 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -147,7 +147,7 @@ public abstract class MergeTreeTestBase {
                 KeyValueFileReaderFactory.builder(
                         LocalFileIO.create(),
                         createTestingSchemaManager(path),
-                        0,
+                        createTestingSchemaManager(path).schema(0),
                         keyType,
                         valueType,
                         ignore -> flushingAvro,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 49e55ee4e..9994c0809 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -36,6 +36,7 @@ import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.RecordWriter;
@@ -664,13 +665,14 @@ public class FileDeletionTest {
         }
 
         SchemaManager schemaManager = new SchemaManager(fileIO, new 
Path(root));
-        schemaManager.createTable(
-                new Schema(
-                        rowType.getFields(),
-                        partitionType.getFieldNames(),
-                        TestKeyValueGenerator.getPrimaryKeys(mode),
-                        Collections.emptyMap(),
-                        null));
+        TableSchema tableSchema =
+                schemaManager.createTable(
+                        new Schema(
+                                rowType.getFields(),
+                                partitionType.getFieldNames(),
+                                TestKeyValueGenerator.getPrimaryKeys(mode),
+                                Collections.emptyMap(),
+                                null));
 
         return new TestFileStore.Builder(
                         "avro",
@@ -680,7 +682,8 @@ public class FileDeletionTest {
                         TestKeyValueGenerator.KEY_TYPE,
                         rowType,
                         
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
-                        DeduplicateMergeFunction.factory())
+                        DeduplicateMergeFunction.factory(),
+                        tableSchema)
                 .changelogProducer(changelogProducer)
                 .build();
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 488f74f67..b0cec3f44 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -35,6 +35,7 @@ import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.ColStats;
 import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.stats.StatsFileHandler;
@@ -856,15 +857,16 @@ public class FileStoreCommitTest {
                         ? FailingFileIO.getFailingPath(failingName, 
tempDir.toString())
                         : TraceableFileIO.SCHEME + "://" + tempDir.toString();
         Path path = new Path(tempDir.toUri());
-        SchemaUtils.forceCommit(
-                new SchemaManager(new LocalFileIO(), path),
-                new Schema(
-                        TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
-                        
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
-                        TestKeyValueGenerator.getPrimaryKeys(
-                                
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
-                        Collections.emptyMap(),
-                        null));
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(new LocalFileIO(), path),
+                        new Schema(
+                                
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+                                
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+                                TestKeyValueGenerator.getPrimaryKeys(
+                                        
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+                                Collections.emptyMap(),
+                                null));
         return new TestFileStore.Builder(
                         "avro",
                         root,
@@ -873,7 +875,8 @@ public class FileStoreCommitTest {
                         TestKeyValueGenerator.KEY_TYPE,
                         TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                         
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
-                        DeduplicateMergeFunction.factory())
+                        DeduplicateMergeFunction.factory(),
+                        tableSchema)
                 .changelogProducer(changelogProducer)
                 .build();
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
index d9b3469f9..5ebe76ed6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
@@ -86,7 +86,8 @@ public abstract class FileStoreExpireTestBase {
                         TestKeyValueGenerator.KEY_TYPE,
                         TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                         
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
-                        DeduplicateMergeFunction.factory())
+                        DeduplicateMergeFunction.factory(),
+                        null)
                 .changelogProducer(changelogProducer)
                 .build();
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
index a3c0142e7..07bf705e2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
@@ -268,7 +268,7 @@ public class KeyValueFileStoreReadTest {
         Path path = new Path(tempDir.toUri());
         SchemaManager schemaManager = new 
SchemaManager(FileIOFinder.find(path), path);
         boolean valueCountMode = mfFactory.create() instanceof 
TestValueCountMergeFunction;
-        schemaManager.createTable(
+        Schema schema =
                 new Schema(
                         (valueCountMode ? keyType : valueType).getFields(),
                         partitionType.getFieldNames(),
@@ -280,7 +280,8 @@ public class KeyValueFileStoreReadTest {
                                                 
partitionType.getFieldNames().stream())
                                         .collect(Collectors.toList()),
                         Collections.emptyMap(),
-                        null));
+                        null);
+        TableSchema tableSchema = schemaManager.createTable(schema);
         return new TestFileStore.Builder(
                         "avro",
                         tempDir.toString(),
@@ -289,7 +290,8 @@ public class KeyValueFileStoreReadTest {
                         keyType,
                         valueType,
                         extractor,
-                        mfFactory)
+                        mfFactory,
+                        tableSchema)
                 .build();
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
index 74dee018e..7f1fc0d3c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
@@ -76,7 +76,8 @@ public class KeyValueFileStoreScanTest {
                                 TestKeyValueGenerator.KEY_TYPE,
                                 TestKeyValueGenerator.DEFAULT_ROW_TYPE,
                                 
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
-                                DeduplicateMergeFunction.factory())
+                                DeduplicateMergeFunction.factory(),
+                                null)
                         .build();
         snapshotManager = store.snapshotManager();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 7ffa94809..d51aefb96 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -119,13 +119,11 @@ public class TestChangelogDataReadWrite {
                             RecordReader.RecordIterator<InternalRow>>
                     rowDataIteratorCreator) {
         SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
tablePath);
-        long schemaId = 0;
         CoreOptions options = new CoreOptions(new HashMap<>());
         KeyValueFileStoreRead read =
                 new KeyValueFileStoreRead(
                         options,
-                        schemaManager,
-                        schemaId,
+                        schemaManager.schema(0),
                         KEY_TYPE,
                         VALUE_TYPE,
                         COMPARATOR,
@@ -133,7 +131,7 @@ public class TestChangelogDataReadWrite {
                         KeyValueFileReaderFactory.builder(
                                 LocalFileIO.create(),
                                 schemaManager,
-                                schemaId,
+                                schemaManager.schema(0),
                                 KEY_TYPE,
                                 VALUE_TYPE,
                                 ignore -> avro,
@@ -177,11 +175,12 @@ public class TestChangelogDataReadWrite {
 
         Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
         pathFactoryMap.put("avro", pathFactory);
+        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
tablePath);
         RecordWriter<KeyValue> writer =
                 new KeyValueFileStoreWrite(
                                 LocalFileIO.create(),
-                                new SchemaManager(LocalFileIO.create(), 
tablePath),
-                                0,
+                                schemaManager,
+                                schemaManager.schema(0),
                                 commitUser,
                                 KEY_TYPE,
                                 VALUE_TYPE,


Reply via email to