This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 157966f6 [FLINK-27843] Schema evolution for data file meta
157966f6 is described below

commit 157966f696522f3c7e939c4cd3713b01c9081017
Author: shammon <[email protected]>
AuthorDate: Fri Nov 18 11:58:29 2022 +0800

    [FLINK-27843] Schema evolution for data file meta
    
    This closes #376
---
 .../table/store/file/AppendOnlyFileStore.java      |   2 +
 .../flink/table/store/file/KeyValueFileStore.java  |   7 +
 .../file/operation/AbstractFileStoreScan.java      |  21 +
 .../file/operation/AppendOnlyFileStoreScan.java    |  32 +-
 .../file/operation/KeyValueFileStoreScan.java      |  41 +-
 .../store/file/schema/KeyFieldsExtractor.java      |  33 ++
 .../flink/table/store/file/schema/RowDataType.java |   2 +-
 .../store/file/schema/SchemaEvolutionUtil.java     |  65 +++
 .../flink/table/store/file/schema/TableSchema.java |  19 +-
 .../file/stats/FieldStatsArraySerializer.java      |  18 +-
 .../table/ChangelogValueCountFileStoreTable.java   |  24 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |  35 +-
 .../flink/table/store/file/TestFileStore.java      |  14 +-
 .../table/store/file/TestKeyValueGenerator.java    |  20 +
 .../table/store/file/io/DataFileTestUtils.java     |  10 +
 .../store/file/operation/FileStoreCommitTest.java  |   1 +
 .../store/file/operation/FileStoreExpireTest.java  |   1 +
 .../file/operation/KeyValueFileStoreReadTest.java  |  26 +-
 .../file/operation/KeyValueFileStoreScanTest.java  |   1 +
 .../store/file/schema/SchemaEvolutionUtilTest.java |  56 ++
 .../file/stats/FieldStatsArraySerializerTest.java  | 120 ++++
 .../table/AppendOnlyTableFileMetaFilterTest.java   |  51 ++
 .../ChangelogValueCountFileMetaFilterTest.java     |  59 ++
 .../table/ChangelogWithKeyFileMetaFilterTest.java  | 137 +++++
 .../table/store/table/FileMetaFilterTestBase.java  | 603 +++++++++++++++++++++
 25 files changed, 1373 insertions(+), 25 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index 18cc6b36..0d889213 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -76,6 +76,8 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<RowData> {
                 bucketKeyType.getFieldCount() == 0 ? rowType : bucketKeyType,
                 rowType,
                 snapshotManager(),
+                schemaManager,
+                schemaId,
                 manifestFileFactory(),
                 manifestListFactory(),
                 options.bucket(),
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 7f7602b6..29fe95dd 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
 import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
 import org.apache.flink.table.store.file.operation.KeyValueFileStoreWrite;
+import org.apache.flink.table.store.file.schema.KeyFieldsExtractor;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.KeyComparatorSupplier;
 import org.apache.flink.table.types.logical.RowType;
@@ -39,6 +40,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
     private final RowType bucketKeyType;
     private final RowType keyType;
     private final RowType valueType;
+    private final KeyFieldsExtractor keyFieldsExtractor;
     private final Supplier<Comparator<RowData>> keyComparatorSupplier;
     private final MergeFunction<KeyValue> mergeFunction;
 
@@ -50,11 +52,13 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
             RowType bucketKeyType,
             RowType keyType,
             RowType valueType,
+            KeyFieldsExtractor keyFieldsExtractor,
             MergeFunction<KeyValue> mergeFunction) {
         super(schemaManager, schemaId, options, partitionType);
         this.bucketKeyType = bucketKeyType;
         this.keyType = keyType;
         this.valueType = valueType;
+        this.keyFieldsExtractor = keyFieldsExtractor;
         this.mergeFunction = mergeFunction;
         this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
     }
@@ -99,6 +103,9 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 bucketKeyType,
                 keyType,
                 snapshotManager(),
+                schemaManager,
+                schemaId,
+                keyFieldsExtractor,
                 manifestFileFactory(),
                 manifestListFactory(),
                 options.bucket(),
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index acb4be11..bd8af8aa 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -29,6 +29,8 @@ import 
org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.predicate.BucketSelector;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
@@ -41,7 +43,9 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -59,6 +63,10 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private final boolean checkNumOfBuckets;
     private final CoreOptions.ChangelogProducer changelogProducer;
 
+    private final Map<Long, TableSchema> tableSchemas;
+    private final SchemaManager schemaManager;
+    private final long schemaId;
+
     private Predicate partitionFilter;
     private BucketSelector bucketSelector;
 
@@ -72,6 +80,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             RowType partitionType,
             RowType bucketKeyType,
             SnapshotManager snapshotManager,
+            SchemaManager schemaManager,
+            long schemaId,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
@@ -83,11 +93,14 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                 bucketKeyType.getFieldCount() > 0, "The bucket keys should not 
be empty.");
         this.bucketKeyType = bucketKeyType;
         this.snapshotManager = snapshotManager;
+        this.schemaManager = schemaManager;
+        this.schemaId = schemaId;
         this.manifestFileFactory = manifestFileFactory;
         this.manifestList = manifestListFactory.create();
         this.numOfBuckets = numOfBuckets;
         this.checkNumOfBuckets = checkNumOfBuckets;
         this.changelogProducer = changelogProducer;
+        this.tableSchemas = new HashMap<>();
     }
 
     @Override
@@ -244,6 +257,14 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         };
     }
 
+    protected TableSchema scanTableSchema() {
+        return scanTableSchema(this.schemaId);
+    }
+
+    protected TableSchema scanTableSchema(long id) {
+        return tableSchemas.computeIfAbsent(id, key -> 
schemaManager.schema(id));
+    }
+
     private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
         return partitionFilter == null
                 || partitionFilter.test(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
index bdc0389b..717d3a79 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
@@ -23,11 +23,16 @@ import 
org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.SchemaEvolutionUtil;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.types.logical.RowType;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
 import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.pickTransformFieldMapping;
@@ -36,7 +41,7 @@ import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.split
 /** {@link FileStoreScan} for {@link 
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
 public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
 
-    private final FieldStatsArraySerializer rowStatsConverter;
+    private final Map<Long, FieldStatsArraySerializer> 
schemaRowStatsConverters;
     private final RowType rowType;
 
     private Predicate filter;
@@ -46,6 +51,8 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
             RowType bucketKeyType,
             RowType rowType,
             SnapshotManager snapshotManager,
+            SchemaManager schemaManager,
+            long schemaId,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
@@ -54,12 +61,14 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
                 partitionType,
                 bucketKeyType,
                 snapshotManager,
+                schemaManager,
+                schemaId,
                 manifestFileFactory,
                 manifestListFactory,
                 numOfBuckets,
                 checkNumOfBuckets,
                 CoreOptions.ChangelogProducer.NONE);
-        this.rowStatsConverter = new FieldStatsArraySerializer(rowType);
+        this.schemaRowStatsConverters = new HashMap<>();
         this.rowType = rowType;
     }
 
@@ -84,6 +93,23 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
                         entry.file().rowCount(),
                         entry.file()
                                 .valueStats()
-                                .fields(rowStatsConverter, 
entry.file().rowCount()));
+                                .fields(
+                                        
getFieldStatsArraySerializer(entry.file().schemaId()),
+                                        entry.file().rowCount()));
+    }
+
+    private FieldStatsArraySerializer getFieldStatsArraySerializer(long 
schemaId) {
+        return schemaRowStatsConverters.computeIfAbsent(
+                schemaId,
+                id -> {
+                    TableSchema tableSchema = scanTableSchema();
+                    TableSchema schema = scanTableSchema(id);
+                    return new FieldStatsArraySerializer(
+                            schema.logicalRowType(),
+                            tableSchema.id() == id
+                                    ? null
+                                    : SchemaEvolutionUtil.createIndexMapping(
+                                            tableSchema.fields(), 
schema.fields()));
+                });
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
index bdfac434..5dc88a45 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
@@ -23,11 +23,19 @@ import 
org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.KeyFieldsExtractor;
+import org.apache.flink.table.store.file.schema.RowDataType;
+import org.apache.flink.table.store.file.schema.SchemaEvolutionUtil;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.types.logical.RowType;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
 import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.pickTransformFieldMapping;
@@ -36,7 +44,8 @@ import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.split
 /** {@link FileStoreScan} for {@link 
org.apache.flink.table.store.file.KeyValueFileStore}. */
 public class KeyValueFileStoreScan extends AbstractFileStoreScan {
 
-    private final FieldStatsArraySerializer keyStatsConverter;
+    private final Map<Long, FieldStatsArraySerializer> 
schemaKeyStatsConverters;
+    private final KeyFieldsExtractor keyFieldsExtractor;
     private final RowType keyType;
 
     private Predicate keyFilter;
@@ -46,6 +55,9 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
             RowType bucketKeyType,
             RowType keyType,
             SnapshotManager snapshotManager,
+            SchemaManager schemaManager,
+            long schemaId,
+            KeyFieldsExtractor keyFieldsExtractor,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
@@ -55,12 +67,15 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                 partitionType,
                 bucketKeyType,
                 snapshotManager,
+                schemaManager,
+                schemaId,
                 manifestFileFactory,
                 manifestListFactory,
                 numOfBuckets,
                 checkNumOfBuckets,
                 changelogProducer);
-        this.keyStatsConverter = new FieldStatsArraySerializer(keyType);
+        this.keyFieldsExtractor = keyFieldsExtractor;
+        this.schemaKeyStatsConverters = new HashMap<>();
         this.keyType = keyType;
     }
 
@@ -83,6 +98,26 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
         return keyFilter == null
                 || keyFilter.test(
                         entry.file().rowCount(),
-                        entry.file().keyStats().fields(keyStatsConverter, 
entry.file().rowCount()));
+                        entry.file()
+                                .keyStats()
+                                .fields(
+                                        
getFieldStatsArraySerializer(entry.file().schemaId()),
+                                        entry.file().rowCount()));
+    }
+
+    private FieldStatsArraySerializer getFieldStatsArraySerializer(long id) {
+        return schemaKeyStatsConverters.computeIfAbsent(
+                id,
+                key -> {
+                    final TableSchema tableSchema = scanTableSchema();
+                    final TableSchema schema = scanTableSchema(key);
+                    final List<DataField> keyFields = 
keyFieldsExtractor.keyFields(schema);
+                    return new FieldStatsArraySerializer(
+                            RowDataType.toRowType(false, keyFields),
+                            tableSchema.id() == key
+                                    ? null
+                                    : SchemaEvolutionUtil.createIndexMapping(
+                                            
keyFieldsExtractor.keyFields(tableSchema), keyFields));
+                });
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/KeyFieldsExtractor.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/KeyFieldsExtractor.java
new file mode 100644
index 00000000..937ed160
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/KeyFieldsExtractor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.schema;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** Extractor of schema for different tables. */
+public interface KeyFieldsExtractor extends Serializable {
+    /**
+     * Extract key fields from table schema.
+     *
+     * @param schema the table schema
+     * @return the key fields
+     */
+    List<DataField> keyFields(TableSchema schema);
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/RowDataType.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/RowDataType.java
index 081b4b88..94916d13 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/RowDataType.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/RowDataType.java
@@ -40,7 +40,7 @@ public class RowDataType extends DataType {
         this.fields = fields;
     }
 
-    private static RowType toRowType(boolean isNullable, List<DataField> 
fields) {
+    public static RowType toRowType(boolean isNullable, List<DataField> 
fields) {
         List<RowType.RowField> typeFields = new ArrayList<>(fields.size());
         for (DataField field : fields) {
             typeFields.add(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
new file mode 100644
index 00000000..e8ee559b
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.schema;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Utils for schema evolution. */
+public class SchemaEvolutionUtil {
+
+    private static final int NULL_FIELD_INDEX = -1;
+
+    /**
+     * Create index mapping from table fields to underlying data fields.
+     *
+     * @param tableFields the fields of table
+     * @param dataFields the fields of underlying data
+     * @return the index mapping
+     */
+    @Nullable
+    public static int[] createIndexMapping(
+            List<DataField> tableFields, List<DataField> dataFields) {
+        int[] indexMapping = new int[tableFields.size()];
+        Map<Integer, Integer> fieldIdToIndex = new HashMap<>();
+        for (int i = 0; i < dataFields.size(); i++) {
+            fieldIdToIndex.put(dataFields.get(i).id(), i);
+        }
+
+        for (int i = 0; i < tableFields.size(); i++) {
+            int fieldId = tableFields.get(i).id();
+            Integer dataFieldIndex = fieldIdToIndex.get(fieldId);
+            if (dataFieldIndex != null) {
+                indexMapping[i] = dataFieldIndex;
+            } else {
+                indexMapping[i] = NULL_FIELD_INDEX;
+            }
+        }
+
+        for (int i = 0; i < indexMapping.length; i++) {
+            if (indexMapping[i] != i) {
+                return indexMapping;
+            }
+        }
+        return null;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
index bed3d8eb..9ef955ff 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
@@ -195,20 +195,25 @@ public class TableSchema implements Serializable {
         return projectedLogicalRowType(trimmedPrimaryKeys());
     }
 
+    public List<DataField> trimmedPrimaryKeysFields() {
+        return projectedDataFields(trimmedPrimaryKeys());
+    }
+
     public int[] projection(List<String> projectedFieldNames) {
         List<String> fieldNames = fieldNames();
         return 
projectedFieldNames.stream().mapToInt(fieldNames::indexOf).toArray();
     }
 
-    private RowType projectedLogicalRowType(List<String> projectedFieldNames) {
+    private List<DataField> projectedDataFields(List<String> 
projectedFieldNames) {
         List<String> fieldNames = fieldNames();
+        return projectedFieldNames.stream()
+                .map(k -> fields.get(fieldNames.indexOf(k)))
+                .collect(Collectors.toList());
+    }
+
+    private RowType projectedLogicalRowType(List<String> projectedFieldNames) {
         return (RowType)
-                new RowDataType(
-                                false,
-                                projectedFieldNames.stream()
-                                        .map(k -> 
fields.get(fieldNames.indexOf(k)))
-                                        .collect(Collectors.toList()))
-                        .logicalType;
+                new RowDataType(false, 
projectedDataFields(projectedFieldNames)).logicalType;
     }
 
     public TableSchema copy(Map<String, String> newOptions) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
index 10d23e8b..3ce0bcf5 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
@@ -43,7 +43,13 @@ public class FieldStatsArraySerializer {
 
     private final RowData.FieldGetter[] fieldGetters;
 
+    @Nullable private final int[] indexMapping;
+
     public FieldStatsArraySerializer(RowType type) {
+        this(type, null);
+    }
+
+    public FieldStatsArraySerializer(RowType type, int[] indexMapping) {
         RowType safeType = toAllFieldsNullableRowType(type);
         this.serializer = new RowDataSerializer(safeType);
         this.fieldGetters =
@@ -53,6 +59,7 @@ public class FieldStatsArraySerializer {
                                         
RowDataUtils.createNullCheckingFieldGetter(
                                                 safeType.getTypeAt(i), i))
                         .toArray(RowData.FieldGetter[]::new);
+        this.indexMapping = indexMapping;
     }
 
     public BinaryTableStats toBinary(FieldStats[] stats) {
@@ -77,10 +84,11 @@ public class FieldStatsArraySerializer {
     }
 
     public FieldStats[] fromBinary(BinaryTableStats array, @Nullable Long 
rowCount) {
-        int fieldCount = fieldGetters.length;
+        int fieldCount = indexMapping == null ? fieldGetters.length : 
indexMapping.length;
         FieldStats[] stats = new FieldStats[fieldCount];
         for (int i = 0; i < fieldCount; i++) {
-            if (i >= array.min().getArity()) {
+            int fieldIndex = indexMapping == null ? i : indexMapping[i];
+            if (fieldIndex < 0 || fieldIndex >= array.min().getArity()) {
                 // simple evolution for add column
                 if (rowCount == null) {
                     throw new RuntimeException("Schema Evolution for stats 
needs row count.");
@@ -89,9 +97,9 @@ public class FieldStatsArraySerializer {
             } else {
                 stats[i] =
                         new FieldStats(
-                                fieldGetters[i].getFieldOrNull(array.min()),
-                                fieldGetters[i].getFieldOrNull(array.max()),
-                                array.nullCounts()[i]);
+                                
fieldGetters[fieldIndex].getFieldOrNull(array.min()),
+                                
fieldGetters[fieldIndex].getFieldOrNull(array.max()),
+                                array.nullCounts()[fieldIndex]);
             }
         }
         return stats;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index b96cbaa9..05294e60 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -29,6 +29,9 @@ import 
org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import 
org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
 import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.KeyFieldsExtractor;
+import org.apache.flink.table.store.file.schema.RowDataType;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -47,6 +50,8 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 
+import java.util.List;
+
 /** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode without 
primary keys. */
 public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
 
@@ -61,6 +66,7 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
                 RowType.of(
                         new LogicalType[] {new BigIntType(false)}, new 
String[] {"_VALUE_COUNT"});
         MergeFunction<KeyValue> mergeFunction = new ValueCountMergeFunction();
+        KeyFieldsExtractor extractor = 
ValueCountTableKeyFieldsExtractor.EXTRACTOR;
         this.store =
                 new KeyValueFileStore(
                         schemaManager,
@@ -68,8 +74,9 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
                         new CoreOptions(tableSchema.options()),
                         tableSchema.logicalPartitionType(),
                         tableSchema.logicalBucketKeyType(),
-                        tableSchema.logicalRowType(),
+                        RowDataType.toRowType(false, 
extractor.keyFields(tableSchema)),
                         countType,
+                        extractor,
                         mergeFunction);
     }
 
@@ -146,4 +153,19 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
     public KeyValueFileStore store() {
         return store;
     }
+
+    /** {@link KeyFieldsExtractor} implementation for {@link 
ChangelogValueCountFileStoreTable}. */
+    static class ValueCountTableKeyFieldsExtractor implements 
KeyFieldsExtractor {
+        private static final long serialVersionUID = 1L;
+
+        static final ValueCountTableKeyFieldsExtractor EXTRACTOR =
+                new ValueCountTableKeyFieldsExtractor();
+
+        private ValueCountTableKeyFieldsExtractor() {}
+
+        @Override
+        public List<DataField> keyFields(TableSchema schema) {
+            return schema.fields();
+        }
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 79dff30a..baef2dfa 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -31,6 +31,9 @@ import 
org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFun
 import 
org.apache.flink.table.store.file.mergetree.compact.aggregate.AggregateMergeFunction;
 import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.KeyFieldsExtractor;
+import org.apache.flink.table.store.file.schema.RowDataType;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -101,6 +104,7 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
         }
 
         CoreOptions options = new CoreOptions(conf);
+        KeyFieldsExtractor extractor = 
ChangelogWithKeyKeyFieldsExtractor.EXTRACTOR;
         this.store =
                 new KeyValueFileStore(
                         schemaManager,
@@ -108,12 +112,13 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                         options,
                         tableSchema.logicalPartitionType(),
                         addKeyNamePrefix(tableSchema.logicalBucketKeyType()),
-                        
addKeyNamePrefix(tableSchema.logicalTrimmedPrimaryKeysType()),
+                        RowDataType.toRowType(false, 
extractor.keyFields(tableSchema)),
                         rowType,
+                        extractor,
                         mergeFunction);
     }
 
-    private RowType addKeyNamePrefix(RowType type) {
+    private static RowType addKeyNamePrefix(RowType type) {
         // add prefix to avoid conflict with value
         return new RowType(
                 type.getFields().stream()
@@ -126,6 +131,18 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                         .collect(Collectors.toList()));
     }
 
+    private static List<DataField> addKeyNamePrefix(List<DataField> keyFields) 
{
+        return keyFields.stream()
+                .map(
+                        f ->
+                                new DataField(
+                                        f.id(),
+                                        KEY_FIELD_PREFIX + f.name(),
+                                        f.type(),
+                                        f.description()))
+                .collect(Collectors.toList());
+    }
+
     @Override
     public DataTableScan newScan() {
         KeyValueFileStoreScan scan = store.newScan();
@@ -215,4 +232,18 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
     public KeyValueFileStore store() {
         return store;
     }
+
+    static class ChangelogWithKeyKeyFieldsExtractor implements 
KeyFieldsExtractor {
+        private static final long serialVersionUID = 1L;
+
+        static final ChangelogWithKeyKeyFieldsExtractor EXTRACTOR =
+                new ChangelogWithKeyKeyFieldsExtractor();
+
+        private ChangelogWithKeyKeyFieldsExtractor() {}
+
+        @Override
+        public List<DataField> keyFields(TableSchema schema) {
+            return addKeyNamePrefix(schema.trimmedPrimaryKeysFields());
+        }
+    }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 09722d51..9d280801 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
 import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.schema.KeyFieldsExtractor;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
@@ -92,6 +93,7 @@ public class TestFileStore extends KeyValueFileStore {
             RowType partitionType,
             RowType keyType,
             RowType valueType,
+            KeyFieldsExtractor keyFieldsExtractor,
             MergeFunction<KeyValue> mergeFunction) {
         super(
                 new SchemaManager(options.path()),
@@ -101,6 +103,7 @@ public class TestFileStore extends KeyValueFileStore {
                 keyType,
                 keyType,
                 valueType,
+                keyFieldsExtractor,
                 mergeFunction);
         this.root = root;
         this.keySerializer = new RowDataSerializer(keyType);
@@ -453,6 +456,7 @@ public class TestFileStore extends KeyValueFileStore {
         private final RowType partitionType;
         private final RowType keyType;
         private final RowType valueType;
+        private final KeyFieldsExtractor keyFieldsExtractor;
         private final MergeFunction<KeyValue> mergeFunction;
 
         private CoreOptions.ChangelogProducer changelogProducer;
@@ -464,6 +468,7 @@ public class TestFileStore extends KeyValueFileStore {
                 RowType partitionType,
                 RowType keyType,
                 RowType valueType,
+                KeyFieldsExtractor keyFieldsExtractor,
                 MergeFunction<KeyValue> mergeFunction) {
             this.format = format;
             this.root = root;
@@ -471,6 +476,7 @@ public class TestFileStore extends KeyValueFileStore {
             this.partitionType = partitionType;
             this.keyType = keyType;
             this.valueType = valueType;
+            this.keyFieldsExtractor = keyFieldsExtractor;
             this.mergeFunction = mergeFunction;
 
             this.changelogProducer = CoreOptions.ChangelogProducer.NONE;
@@ -500,7 +506,13 @@ public class TestFileStore extends KeyValueFileStore {
             conf.set(CoreOptions.CHANGELOG_PRODUCER, changelogProducer);
 
             return new TestFileStore(
-                    root, new CoreOptions(conf), partitionType, keyType, 
valueType, mergeFunction);
+                    root,
+                    new CoreOptions(conf),
+                    partitionType,
+                    keyType,
+                    valueType,
+                    keyFieldsExtractor,
+                    mergeFunction);
         }
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
index 90a02764..28c76294 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
@@ -25,6 +25,9 @@ import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.generated.RecordComparator;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.KeyFieldsExtractor;
+import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
@@ -104,6 +107,8 @@ public class TestKeyValueGenerator {
                     new String[] {"shopId", "orderId", "itemId", 
"priceAmount", "comment"});
     public static final RowType NON_PARTITIONED_PART_TYPE = RowType.of();
 
+    public static final List<String> KEY_NAME_LIST = Arrays.asList("shopId", 
"orderId");
+
     public static final RowType KEY_TYPE =
             RowType.of(
                     new LogicalType[] {new IntType(false), new 
BigIntType(false)},
@@ -323,4 +328,19 @@ public class TestKeyValueGenerator {
         SINGLE_PARTITIONED,
         MULTI_PARTITIONED
     }
+
+    /** {@link KeyFieldsExtractor} implementation for test. */
+    public static class TestKeyFieldsExtractor implements KeyFieldsExtractor {
+        private static final long serialVersionUID = 1L;
+
+        public static final TestKeyFieldsExtractor EXTRACTOR = new 
TestKeyFieldsExtractor();
+
+        @Override
+        public List<DataField> keyFields(TableSchema schema) {
+            return schema.fields().stream()
+                    .filter(f -> KEY_NAME_LIST.contains(f.name()))
+                    .map(f -> new DataField(f.id(), "key_" + f.name(), 
f.type(), f.description()))
+                    .collect(Collectors.toList());
+        }
+    }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/DataFileTestUtils.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/DataFileTestUtils.java
index c8480cb9..cbc0a36b 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/DataFileTestUtils.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/DataFileTestUtils.java
@@ -86,4 +86,14 @@ public class DataFileTestUtils {
         writer.complete();
         return row;
     }
+
+    public static BinaryRowData row(int... values) {
+        BinaryRowData row = new BinaryRowData(values.length);
+        BinaryRowWriter writer = new BinaryRowWriter(row);
+        for (int i = 0; i < values.length; i++) {
+            writer.writeInt(i, values[i]);
+        }
+        writer.complete();
+        return row;
+    }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 85b2733a..c24c71dc 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -537,6 +537,7 @@ public class FileStoreCommitTest {
                         TestKeyValueGenerator.DEFAULT_PART_TYPE,
                         TestKeyValueGenerator.KEY_TYPE,
                         TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                        TestKeyValueGenerator.TestKeyFieldsExtractor.EXTRACTOR,
                         new DeduplicateMergeFunction())
                 .changelogProducer(changelogProducer)
                 .build();
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index 90ac35ba..0c40fd28 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -92,6 +92,7 @@ public class FileStoreExpireTest {
                         TestKeyValueGenerator.DEFAULT_PART_TYPE,
                         TestKeyValueGenerator.KEY_TYPE,
                         TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                        TestKeyValueGenerator.TestKeyFieldsExtractor.EXTRACTOR,
                         new DeduplicateMergeFunction())
                 .changelogProducer(changelogProducer)
                 .build();
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
index 1dcc2abe..d2ec2bec 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
@@ -29,7 +29,10 @@ import 
org.apache.flink.table.store.file.manifest.ManifestEntry;
 import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import 
org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.KeyFieldsExtractor;
 import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
@@ -45,6 +48,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -90,12 +94,13 @@ public class KeyValueFileStoreReadTest {
         RowType partitionType =
                 RowType.of(new LogicalType[] {new IntType(false)}, new 
String[] {"c"});
         RowDataSerializer partitionSerializer = new 
RowDataSerializer(partitionType);
+        List<String> keyNames = Arrays.asList("a", "b", "c");
         RowType keyType =
                 RowType.of(
                         new LogicalType[] {
                             new IntType(false), new IntType(false), new 
IntType(false)
                         },
-                        new String[] {"a", "b", "c"});
+                        keyNames.toArray(new String[0]));
         RowType projectedKeyType = RowType.of(new IntType(false), new 
IntType(false));
         RowDataSerializer projectedKeySerializer = new 
RowDataSerializer(projectedKeyType);
         RowType valueType =
@@ -103,7 +108,21 @@ public class KeyValueFileStoreReadTest {
         RowDataSerializer valueSerializer = new RowDataSerializer(valueType);
 
         TestFileStore store =
-                createStore(partitionType, keyType, valueType, new 
ValueCountMergeFunction());
+                createStore(
+                        partitionType,
+                        keyType,
+                        valueType,
+                        new KeyFieldsExtractor() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public List<DataField> keyFields(TableSchema 
schema) {
+                                return schema.fields().stream()
+                                        .filter(f -> 
keyNames.contains(f.name()))
+                                        .collect(Collectors.toList());
+                            }
+                        },
+                        new ValueCountMergeFunction());
         List<KeyValue> readData =
                 writeThenRead(
                         data,
@@ -142,6 +161,7 @@ public class KeyValueFileStoreReadTest {
                         TestKeyValueGenerator.DEFAULT_PART_TYPE,
                         TestKeyValueGenerator.KEY_TYPE,
                         TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                        TestKeyValueGenerator.TestKeyFieldsExtractor.EXTRACTOR,
                         new DeduplicateMergeFunction());
 
         RowDataSerializer projectedValueSerializer =
@@ -230,6 +250,7 @@ public class KeyValueFileStoreReadTest {
             RowType partitionType,
             RowType keyType,
             RowType valueType,
+            KeyFieldsExtractor extractor,
             MergeFunction<KeyValue> mergeFunction)
             throws Exception {
         SchemaManager schemaManager = new SchemaManager(new 
Path(tempDir.toUri()));
@@ -254,6 +275,7 @@ public class KeyValueFileStoreReadTest {
                         partitionType,
                         keyType,
                         valueType,
+                        extractor,
                         mergeFunction)
                 .build();
     }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
index ea5bcda9..66698c66 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
@@ -71,6 +71,7 @@ public class KeyValueFileStoreScanTest {
                                 TestKeyValueGenerator.DEFAULT_PART_TYPE,
                                 TestKeyValueGenerator.KEY_TYPE,
                                 TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                                
TestKeyValueGenerator.TestKeyFieldsExtractor.EXTRACTOR,
                                 new DeduplicateMergeFunction())
                         .build();
         snapshotManager = store.snapshotManager();
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtilTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtilTest.java
new file mode 100644
index 00000000..0e929c73
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtilTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.schema;
+
+import org.apache.flink.table.api.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SchemaEvolutionUtil}. */
+public class SchemaEvolutionUtilTest {
+    @Test
+    public void testCreateIndexMapping() {
+        List<DataField> dataFields =
+                Arrays.asList(
+                        new DataField(0, "a", new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                        new DataField(1, "b", new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                        new DataField(2, "c", new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                        new DataField(
+                                3, "d", new 
AtomicDataType(DataTypes.INT().getLogicalType())));
+        List<DataField> tableFields =
+                Arrays.asList(
+                        new DataField(1, "c", new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                        new DataField(3, "a", new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                        new DataField(5, "d", new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                        new DataField(
+                                6, "e", new 
AtomicDataType(DataTypes.INT().getLogicalType())));
+        int[] indexMapping = 
SchemaEvolutionUtil.createIndexMapping(tableFields, dataFields);
+
+        
assertThat(indexMapping.length).isEqualTo(tableFields.size()).isEqualTo(4);
+        assertThat(indexMapping[0]).isEqualTo(1);
+        assertThat(indexMapping[1]).isEqualTo(3);
+        assertThat(indexMapping[2]).isLessThan(0);
+        assertThat(indexMapping[3]).isLessThan(0);
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
new file mode 100644
index 00000000..2e63eea5
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.schema.AtomicDataType;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.SchemaEvolutionUtil;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.format.FieldStats;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.table.store.file.io.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FieldStatsArraySerializer}. */
+public class FieldStatsArraySerializerTest {
+    @Test
+    public void testFromBinary() {
+        TableSchema dataSchema =
+                new TableSchema(
+                        0,
+                        Arrays.asList(
+                                new DataField(
+                                        0,
+                                        "a",
+                                        new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                                new DataField(
+                                        1,
+                                        "b",
+                                        new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                                new DataField(
+                                        2,
+                                        "c",
+                                        new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                                new DataField(
+                                        3,
+                                        "d",
+                                        new 
AtomicDataType(DataTypes.INT().getLogicalType()))),
+                        3,
+                        Collections.EMPTY_LIST,
+                        Collections.EMPTY_LIST,
+                        Collections.EMPTY_MAP,
+                        "");
+        TableSchema tableSchema =
+                new TableSchema(
+                        0,
+                        Arrays.asList(
+                                new DataField(
+                                        1,
+                                        "c",
+                                        new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                                new DataField(
+                                        3,
+                                        "a",
+                                        new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                                new DataField(
+                                        5,
+                                        "d",
+                                        new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                                new DataField(
+                                        6,
+                                        "e",
+                                        new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                                new DataField(
+                                        7,
+                                        "b",
+                                        new 
AtomicDataType(DataTypes.INT().getLogicalType()))),
+                        7,
+                        Collections.EMPTY_LIST,
+                        Collections.EMPTY_LIST,
+                        Collections.EMPTY_MAP,
+                        "");
+
+        FieldStatsArraySerializer fieldStatsArraySerializer =
+                new FieldStatsArraySerializer(
+                        tableSchema.logicalRowType(),
+                        SchemaEvolutionUtil.createIndexMapping(
+                                tableSchema.fields(), dataSchema.fields()));
+        BinaryRowData minRowData = row(1, 2, 3, 4);
+        BinaryRowData maxRowData = row(100, 99, 98, 97);
+        long[] nullCounts = new long[] {1, 0, 10, 100};
+        BinaryTableStats dataTableStats = new BinaryTableStats(minRowData, 
maxRowData, nullCounts);
+
+        FieldStats[] fieldStatsArray = 
dataTableStats.fields(fieldStatsArraySerializer, 1000L);
+        
assertThat(fieldStatsArray.length).isEqualTo(tableSchema.fields().size()).isEqualTo(5);
+        checkFieldStats(fieldStatsArray[0], 2, 99, 0L);
+        checkFieldStats(fieldStatsArray[1], 4, 97, 100L);
+        checkFieldStats(fieldStatsArray[2], null, null, 1000L);
+        checkFieldStats(fieldStatsArray[3], null, null, 1000L);
+        checkFieldStats(fieldStatsArray[4], null, null, 1000L);
+    }
+
+    private void checkFieldStats(FieldStats fieldStats, Integer min, Integer 
max, Long nullCount) {
+        assertThat(fieldStats.minValue()).isEqualTo(min);
+        assertThat(fieldStats.maxValue()).isEqualTo(max);
+        assertThat(fieldStats.nullCount()).isEqualTo(nullCount);
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyTableFileMetaFilterTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyTableFileMetaFilterTest.java
new file mode 100644
index 00000000..84166a03
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyTableFileMetaFilterTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Map;
+
+/** Tests for meta files in {@link AppendOnlyFileStoreTable} with schema 
evolution. */
+public class AppendOnlyTableFileMetaFilterTest extends FileMetaFilterTestBase {
+
+    @BeforeEach
+    public void before() throws Exception {
+        super.before();
+        tableConfig.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+    }
+
+    @Override
+    protected FileStoreTable createFileStoreTable(Map<Long, TableSchema> 
tableSchemas) {
+        SchemaManager schemaManager = new TestingSchemaManager(tablePath, 
tableSchemas);
+        return new AppendOnlyFileStoreTable(tablePath, schemaManager, 
schemaManager.latest().get());
+    }
+
+    @Override
+    protected BinaryTableStats getTableValueStats(DataFileMeta fileMeta) {
+        return fileMeta.valueStats();
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileMetaFilterTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileMetaFilterTest.java
new file mode 100644
index 00000000..28667fd9
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileMetaFilterTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** Tests for meta files in {@link ChangelogValueCountFileStoreTable} with 
schema evolution. */
+public class ChangelogValueCountFileMetaFilterTest extends 
FileMetaFilterTestBase {
+
+    @BeforeEach
+    public void before() throws Exception {
+        super.before();
+        tableConfig.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+    }
+
+    @Override
+    protected List<String> getPrimaryKeyNames() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    protected FileStoreTable createFileStoreTable(Map<Long, TableSchema> 
tableSchemas) {
+        SchemaManager schemaManager = new TestingSchemaManager(tablePath, 
tableSchemas);
+        return new ChangelogValueCountFileStoreTable(
+                tablePath, schemaManager, schemaManager.latest().get());
+    }
+
+    @Override
+    protected BinaryTableStats getTableValueStats(DataFileMeta fileMeta) {
+        return fileMeta.keyStats();
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileMetaFilterTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileMetaFilterTest.java
new file mode 100644
index 00000000..b664f259
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileMetaFilterTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Tests for meta files in {@link ChangelogWithKeyFileStoreTable} with schema 
evolution. */
+public class ChangelogWithKeyFileMetaFilterTest extends FileMetaFilterTestBase 
{
+
+    @BeforeEach
+    public void before() throws Exception {
+        super.before();
+        tableConfig.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+    }
+
+    @Test
+    @Override
+    public void testTableScan() throws Exception {
+        writeAndCheckFileMeta(
+                schemas -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    DataTableScan.DataFilePlan plan = table.newScan().plan();
+                    checkFilterRowCount(plan, 6L);
+                    return plan.splits.stream()
+                            .flatMap(s -> s.files().stream())
+                            .collect(Collectors.toList());
+                },
+                (files, schemas) -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    DataTableScan.DataFilePlan plan = table.newScan().plan();
+                    checkFilterRowCount(plan, 12L);
+                },
+                getPrimaryKeyNames(),
+                tableConfig,
+                this::createFileStoreTable);
+    }
+
+    @Test
+    @Override
+    public void testTableScanFilterExistFields() throws Exception {
+        writeAndCheckFileMeta(
+                schemas -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    // results of field "b" in [14, 19] in SCHEMA_0_FIELDS, 
"b" is renamed to "d" in
+                    // SCHEMA_1_FIELDS
+                    Predicate predicate =
+                            new 
PredicateBuilder(table.schema().logicalRowType())
+                                    .between(2, 14, 19);
+                    DataTableScan.DataFilePlan plan = 
table.newScan().withFilter(predicate).plan();
+                    checkFilterRowCount(plan, 6L);
+                    return plan.splits.stream()
+                            .flatMap(s -> s.files().stream())
+                            .collect(Collectors.toList());
+                },
+                (files, schemas) -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    PredicateBuilder builder =
+                            new 
PredicateBuilder(table.schema().logicalRowType());
+                    // results of field "d" in [14, 19] in SCHEMA_1_FIELDS
+                    Predicate predicate = builder.between(1, 14, 19);
+                    DataTableScan.DataFilePlan plan = 
table.newScan().withFilter(predicate).plan();
+                    checkFilterRowCount(plan, 12L);
+                },
+                getPrimaryKeyNames(),
+                tableConfig,
+                this::createFileStoreTable);
+    }
+
+    @Test
+    @Override
+    public void testTableScanFilterNewFields() throws Exception {
+        writeAndCheckFileMeta(
+                schemas -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    DataTableScan.DataFilePlan plan = table.newScan().plan();
+                    checkFilterRowCount(plan, 6L);
+                    return plan.splits.stream()
+                            .flatMap(s -> s.files().stream())
+                            .collect(Collectors.toList());
+                },
+                (files, schemas) -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    PredicateBuilder builder =
+                            new 
PredicateBuilder(table.schema().logicalRowType());
+                    // results of field "a" in (1120, -] in SCHEMA_1_FIELDS, 
"a" is not existed in
+                    // SCHEMA_0_FIELDS
+                    Predicate predicate = builder.greaterThan(3, 1120);
+                    DataTableScan.DataFilePlan plan = 
table.newScan().withFilter(predicate).plan();
+                    checkFilterRowCount(plan, 12L);
+                },
+                getPrimaryKeyNames(),
+                tableConfig,
+                this::createFileStoreTable);
+    }
+
+    @Override
+    protected FileStoreTable createFileStoreTable(Map<Long, TableSchema> 
tableSchemas) {
+        SchemaManager schemaManager = new TestingSchemaManager(tablePath, 
tableSchemas);
+        return new ChangelogWithKeyFileStoreTable(
+                tablePath, schemaManager, schemaManager.latest().get());
+    }
+
+    @Override
+    protected BinaryTableStats getTableValueStats(DataFileMeta fileMeta) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileMetaFilterTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileMetaFilterTestBase.java
new file mode 100644
index 00000000..680ef984
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileMetaFilterTestBase.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.AtomicDataType;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.SchemaChange;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
+import org.apache.flink.table.store.file.utils.TraceableFileSystem;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base test class for schema evolution in {@link FileStoreTable}. */
+public abstract class FileMetaFilterTestBase {
+    protected static final List<DataField> SCHEMA_0_FIELDS =
+            Arrays.asList(
+                    new DataField(0, "a", new 
AtomicDataType(DataTypes.STRING().getLogicalType())),
+                    new DataField(1, "pt", new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                    new DataField(2, "b", new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                    new DataField(3, "c", new 
AtomicDataType(DataTypes.STRING().getLogicalType())),
+                    new DataField(4, "kt", new 
AtomicDataType(DataTypes.BIGINT().getLogicalType())),
+                    new DataField(5, "d", new 
AtomicDataType(DataTypes.STRING().getLogicalType())));
+    protected static final List<DataField> SCHEMA_1_FIELDS =
+            Arrays.asList(
+                    new DataField(1, "pt", new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                    new DataField(2, "d", new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                    new DataField(4, "kt", new 
AtomicDataType(DataTypes.BIGINT().getLogicalType())),
+                    new DataField(6, "a", new 
AtomicDataType(DataTypes.INT().getLogicalType())),
+                    new DataField(7, "f", new 
AtomicDataType(DataTypes.STRING().getLogicalType())),
+                    new DataField(8, "b", new 
AtomicDataType(DataTypes.STRING().getLogicalType())));
+    protected static final List<String> PARTITION_NAMES = 
Collections.singletonList("pt");
+    protected static final List<String> PRIMARY_KEY_NAMES = 
Arrays.asList("pt", "kt");
+
+    protected Path tablePath;
+    protected String commitUser;
+    protected final Configuration tableConfig = new Configuration();
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @BeforeEach
+    public void before() throws Exception {
+        tablePath = new Path(TestAtomicRenameFileSystem.SCHEME + "://" + 
tempDir.toString());
+        commitUser = UUID.randomUUID().toString();
+        tableConfig.set(CoreOptions.PATH, tablePath.toString());
+        tableConfig.set(CoreOptions.BUCKET, 2);
+    }
+
+    @AfterEach
+    public void after() throws IOException {
+        // assert all connections are closed
+        FileSystem fileSystem = tablePath.getFileSystem();
+        assertThat(fileSystem).isInstanceOf(TraceableFileSystem.class);
+        TraceableFileSystem traceableFileSystem = (TraceableFileSystem) 
fileSystem;
+
+        java.util.function.Predicate<Path> pathPredicate =
+                path -> path.toString().contains(tempDir.toString());
+        
assertThat(traceableFileSystem.openInputStreams(pathPredicate)).isEmpty();
+        
assertThat(traceableFileSystem.openOutputStreams(pathPredicate)).isEmpty();
+    }
+
+    @Test
+    public void testTableScan() throws Exception {
+        writeAndCheckFileMeta(
+                schemas -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    DataTableScan.DataFilePlan plan = table.newScan().plan();
+                    checkFilterRowCount(plan, 6L);
+                    return plan.splits.stream()
+                            .flatMap(s -> s.files().stream())
+                            .collect(Collectors.toList());
+                },
+                (files, schemas) -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    DataTableScan.DataFilePlan plan =
+                            table.newScan()
+                                    .withFilter(
+                                            new 
PredicateBuilder(table.schema().logicalRowType())
+                                                    .greaterOrEqual(1, 0))
+                                    .plan();
+                    checkFilterRowCount(plan, 12L);
+
+                    List<String> filesName =
+                            
files.stream().map(DataFileMeta::fileName).collect(Collectors.toList());
+                    assertThat(filesName.size()).isGreaterThan(0);
+
+                    List<DataFileMeta> fileMetaList =
+                            plan.splits.stream()
+                                    .flatMap(s -> s.files().stream())
+                                    .collect(Collectors.toList());
+                    assertThat(
+                                    fileMetaList.stream()
+                                            .map(DataFileMeta::fileName)
+                                            .collect(Collectors.toList()))
+                            .containsAll(filesName);
+
+                    for (DataFileMeta fileMeta : fileMetaList) {
+                        FieldStats[] statsArray = 
getTableValueStats(fileMeta).fields(null);
+                        assertThat(statsArray.length).isEqualTo(6);
+                        if (filesName.contains(fileMeta.fileName())) {
+                            assertThat(statsArray[0].minValue()).isNotNull();
+                            assertThat(statsArray[0].maxValue()).isNotNull();
+                            assertThat(statsArray[1].minValue()).isNotNull();
+                            assertThat(statsArray[1].maxValue()).isNotNull();
+                            assertThat(statsArray[2].minValue()).isNotNull();
+                            assertThat(statsArray[2].maxValue()).isNotNull();
+
+                            assertThat(statsArray[3].minValue()).isNull();
+                            assertThat(statsArray[3].maxValue()).isNull();
+                            assertThat(statsArray[4].minValue()).isNull();
+                            assertThat(statsArray[4].maxValue()).isNull();
+                            assertThat(statsArray[5].minValue()).isNull();
+                            assertThat(statsArray[5].maxValue()).isNull();
+                        } else {
+                            assertThat(statsArray[0].minValue()).isNotNull();
+                            assertThat(statsArray[0].maxValue()).isNotNull();
+                            assertThat(statsArray[1].minValue()).isNotNull();
+                            assertThat(statsArray[1].maxValue()).isNotNull();
+                            assertThat(statsArray[2].minValue()).isNotNull();
+                            assertThat(statsArray[2].maxValue()).isNotNull();
+                            assertThat(statsArray[3].minValue()).isNotNull();
+                            assertThat(statsArray[3].maxValue()).isNotNull();
+                            assertThat(statsArray[4].minValue()).isNotNull();
+                            assertThat(statsArray[4].maxValue()).isNotNull();
+                            assertThat(statsArray[5].minValue()).isNotNull();
+                            assertThat(statsArray[5].maxValue()).isNotNull();
+                        }
+                    }
+                },
+                getPrimaryKeyNames(),
+                tableConfig,
+                this::createFileStoreTable);
+    }
+
+    @Test
+    public void testTableScanFilterExistFields() throws Exception {
+        writeAndCheckFileMeta(
+                schemas -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    // results of field "b" in [14, 19] in SCHEMA_0_FIELDS, 
"b" is renamed to "d" in
+                    // SCHEMA_1_FIELDS
+                    Predicate predicate =
+                            new 
PredicateBuilder(table.schema().logicalRowType())
+                                    .between(2, 14, 19);
+                    List<DataFileMeta> files =
+                            
table.newScan().withFilter(predicate).plan().splits.stream()
+                                    .flatMap(s -> s.files().stream())
+                                    .collect(Collectors.toList());
+                    assertThat(files.size()).isGreaterThan(0);
+                    checkFilterRowCount(files, 3L);
+                    return files;
+                },
+                (files, schemas) -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    PredicateBuilder builder =
+                            new 
PredicateBuilder(table.schema().logicalRowType());
+                    // results of field "d" in [14, 19] in SCHEMA_1_FIELDS
+                    Predicate predicate = builder.between(1, 14, 19);
+                    DataTableScan.DataFilePlan filterFilePlan =
+                            table.newScan().withFilter(predicate).plan();
+                    List<DataFileMeta> filterFileMetas =
+                            filterFilePlan.splits.stream()
+                                    .flatMap(s -> s.files().stream())
+                                    .collect(Collectors.toList());
+                    checkFilterRowCount(filterFileMetas, 6L);
+
+                    List<String> fileNameList =
+                            filterFileMetas.stream()
+                                    .map(DataFileMeta::fileName)
+                                    .collect(Collectors.toList());
+                    Set<String> fileNames =
+                            filterFileMetas.stream()
+                                    .map(DataFileMeta::fileName)
+                                    .collect(Collectors.toSet());
+                    
assertThat(fileNameList.size()).isEqualTo(fileNames.size());
+
+                    builder = new 
PredicateBuilder(table.schema().logicalRowType());
+                    // get all meta files with filter
+                    DataTableScan.DataFilePlan filterAllFilePlan =
+                            
table.newScan().withFilter(builder.greaterOrEqual(1, 0)).plan();
+                    assertThat(
+                                    filterAllFilePlan.splits.stream()
+                                            .flatMap(
+                                                    s ->
+                                                            s.files().stream()
+                                                                    
.map(DataFileMeta::fileName))
+                                            .collect(Collectors.toList()))
+                            .containsAll(
+                                    files.stream()
+                                            .map(DataFileMeta::fileName)
+                                            .collect(Collectors.toList()));
+
+                    // get all meta files without filter
+                    DataTableScan.DataFilePlan allFilePlan = 
table.newScan().plan();
+                    
assertThat(filterAllFilePlan.splits).isEqualTo(allFilePlan.splits);
+
+                    Set<String> filterFileNames = new HashSet<>();
+                    for (DataSplit dataSplit : filterAllFilePlan.splits) {
+                        for (DataFileMeta dataFileMeta : dataSplit.files()) {
+                            FieldStats[] fieldStats = 
getTableValueStats(dataFileMeta).fields(null);
+                            int minValue = (Integer) fieldStats[1].minValue();
+                            int maxValue = (Integer) fieldStats[1].maxValue();
+                            if (minValue >= 14
+                                    && minValue <= 19
+                                    && maxValue >= 14
+                                    && maxValue <= 19) {
+                                filterFileNames.add(dataFileMeta.fileName());
+                            }
+                        }
+                    }
+                    assertThat(filterFileNames).isEqualTo(fileNames);
+                },
+                getPrimaryKeyNames(),
+                tableConfig,
+                this::createFileStoreTable);
+    }
+
+    @Test
+    public void testTableScanFilterNewFields() throws Exception {
+        writeAndCheckFileMeta(
+                schemas -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    List<DataFileMeta> files =
+                            table.newScan().plan().splits.stream()
+                                    .flatMap(s -> s.files().stream())
+                                    .collect(Collectors.toList());
+                    assertThat(files.size()).isGreaterThan(0);
+                    checkFilterRowCount(files, 6L);
+                    return files;
+                },
+                (files, schemas) -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    PredicateBuilder builder =
+                            new 
PredicateBuilder(table.schema().logicalRowType());
+
+                    // results of field "a" in (1120, -] in SCHEMA_1_FIELDS, 
"a" is not existed in
+                    // SCHEMA_0_FIELDS
+                    Predicate predicate = builder.greaterThan(3, 1120);
+                    DataTableScan.DataFilePlan filterFilePlan =
+                            table.newScan().withFilter(predicate).plan();
+                    checkFilterRowCount(filterFilePlan, 2L);
+
+                    List<DataFileMeta> filterFileMetas =
+                            filterFilePlan.splits.stream()
+                                    .flatMap(s -> s.files().stream())
+                                    .collect(Collectors.toList());
+                    List<String> fileNameList =
+                            filterFileMetas.stream()
+                                    .map(DataFileMeta::fileName)
+                                    .collect(Collectors.toList());
+                    Set<String> fileNames =
+                            filterFileMetas.stream()
+                                    .map(DataFileMeta::fileName)
+                                    .collect(Collectors.toSet());
+                    
assertThat(fileNameList.size()).isEqualTo(fileNames.size());
+
+                    List<String> filesName =
+                            
files.stream().map(DataFileMeta::fileName).collect(Collectors.toList());
+                    
assertThat(fileNameList).doesNotContainAnyElementsOf(filesName);
+
+                    DataTableScan.DataFilePlan allFilePlan =
+                            
table.newScan().withFilter(builder.greaterOrEqual(1, 0)).plan();
+                    checkFilterRowCount(allFilePlan, 12L);
+
+                    Set<String> filterFileNames = new HashSet<>();
+                    for (DataSplit dataSplit : allFilePlan.splits) {
+                        for (DataFileMeta dataFileMeta : dataSplit.files()) {
+                            FieldStats[] fieldStats = 
getTableValueStats(dataFileMeta).fields(null);
+                            Integer minValue = (Integer) 
fieldStats[3].minValue();
+                            Integer maxValue = (Integer) 
fieldStats[3].maxValue();
+                            if (minValue != null
+                                    && maxValue != null
+                                    && minValue > 1120
+                                    && maxValue > 1120) {
+                                filterFileNames.add(dataFileMeta.fileName());
+                            }
+                        }
+                    }
+                    assertThat(filterFileNames).isEqualTo(fileNames);
+                },
+                getPrimaryKeyNames(),
+                tableConfig,
+                this::createFileStoreTable);
+    }
+
+    @Test
+    public void testTableScanFilterPartition() throws Exception {
+        writeAndCheckFileMeta(
+                schemas -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    checkFilterRowCount(table, 1, 1, 3L);
+                    checkFilterRowCount(table, 1, 2, 3L);
+                    return null;
+                },
+                (files, schemas) -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    checkFilterRowCount(table, 0, 1, 7L);
+                    checkFilterRowCount(table, 0, 2, 5L);
+                },
+                getPrimaryKeyNames(),
+                tableConfig,
+                this::createFileStoreTable);
+    }
+
+    @Test
+    public void testTableScanFilterPrimaryKey() throws Exception {
+        writeAndCheckFileMeta(
+                schemas -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    PredicateBuilder builder =
+                            new 
PredicateBuilder(table.schema().logicalRowType());
+                    Predicate predicate = builder.between(4, 115L, 120L);
+                    DataTableScan.DataFilePlan plan = 
table.newScan().withFilter(predicate).plan();
+                    checkFilterRowCount(plan, 2L);
+                    return null;
+                },
+                (files, schemas) -> {
+                    FileStoreTable table = createFileStoreTable(schemas);
+                    PredicateBuilder builder =
+                            new 
PredicateBuilder(table.schema().logicalRowType());
+                    Predicate predicate = builder.between(2, 115L, 120L);
+                    DataTableScan.DataFilePlan plan = 
table.newScan().withFilter(predicate).plan();
+                    checkFilterRowCount(plan, 6L);
+                },
+                getPrimaryKeyNames(),
+                tableConfig,
+                this::createFileStoreTable);
+    }
+
+    protected List<String> getPrimaryKeyNames() {
+        return PRIMARY_KEY_NAMES;
+    }
+
+    protected abstract FileStoreTable createFileStoreTable(Map<Long, 
TableSchema> tableSchemas);
+
+    protected abstract BinaryTableStats getTableValueStats(DataFileMeta 
fileMeta);
+
+    public static <R> void writeAndCheckFileMeta(
+            Function<Map<Long, TableSchema>, R> firstChecker,
+            BiConsumer<R, Map<Long, TableSchema>> secondChecker,
+            List<String> primaryKeyNames,
+            Configuration tableConfig,
+            Function<Map<Long, TableSchema>, FileStoreTable> 
createFileStoreTable)
+            throws Exception {
+        Map<Long, TableSchema> tableSchemas = new HashMap<>();
+        tableSchemas.put(
+                0L,
+                new TableSchema(
+                        0,
+                        SCHEMA_0_FIELDS,
+                        5,
+                        PARTITION_NAMES,
+                        primaryKeyNames,
+                        tableConfig.toMap(),
+                        ""));
+        FileStoreTable table = createFileStoreTable.apply(tableSchemas);
+        TableWrite write = table.newWrite("user");
+        TableCommit commit = table.newCommit("user");
+
+        write.write(
+                GenericRowData.of(
+                        StringData.fromString("S001"),
+                        1,
+                        11,
+                        StringData.fromString("S11"),
+                        111L,
+                        StringData.fromString("S111")));
+        write.write(
+                GenericRowData.of(
+                        StringData.fromString("S002"),
+                        2,
+                        12,
+                        StringData.fromString("S12"),
+                        112L,
+                        StringData.fromString("S112")));
+        write.write(
+                GenericRowData.of(
+                        StringData.fromString("S003"),
+                        1,
+                        13,
+                        StringData.fromString("S13"),
+                        113L,
+                        StringData.fromString("S113")));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(
+                GenericRowData.of(
+                        StringData.fromString("S004"),
+                        1,
+                        14,
+                        StringData.fromString("S14"),
+                        114L,
+                        StringData.fromString("S114")));
+        write.write(
+                GenericRowData.of(
+                        StringData.fromString("S005"),
+                        2,
+                        15,
+                        StringData.fromString("S15"),
+                        115L,
+                        StringData.fromString("S115")));
+        write.write(
+                GenericRowData.of(
+                        StringData.fromString("S006"),
+                        2,
+                        16,
+                        StringData.fromString("S16"),
+                        116L,
+                        StringData.fromString("S116")));
+        commit.commit(0, write.prepareCommit(true, 0));
+        write.close();
+        R result = firstChecker.apply(tableSchemas);
+
+        tableSchemas.put(
+                1L,
+                new TableSchema(
+                        1,
+                        SCHEMA_1_FIELDS,
+                        8,
+                        PARTITION_NAMES,
+                        primaryKeyNames,
+                        tableConfig.toMap(),
+                        ""));
+        table = createFileStoreTable.apply(tableSchemas);
+        write = table.newWrite("user");
+        commit = table.newCommit("user");
+
+        write.write(
+                GenericRowData.of(
+                        1,
+                        17,
+                        117L,
+                        1117,
+                        StringData.fromString("S007"),
+                        StringData.fromString("S17")));
+        write.write(
+                GenericRowData.of(
+                        2,
+                        18,
+                        118L,
+                        1118,
+                        StringData.fromString("S008"),
+                        StringData.fromString("S18")));
+        write.write(
+                GenericRowData.of(
+                        1,
+                        19,
+                        119L,
+                        1119,
+                        StringData.fromString("S009"),
+                        StringData.fromString("S19")));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(
+                GenericRowData.of(
+                        2,
+                        20,
+                        120L,
+                        1120,
+                        StringData.fromString("S010"),
+                        StringData.fromString("S20")));
+        write.write(
+                GenericRowData.of(
+                        1,
+                        21,
+                        121L,
+                        1121,
+                        StringData.fromString("S011"),
+                        StringData.fromString("S21")));
+        write.write(
+                GenericRowData.of(
+                        1,
+                        22,
+                        122L,
+                        1122,
+                        StringData.fromString("S012"),
+                        StringData.fromString("S22")));
+        commit.commit(0, write.prepareCommit(true, 0));
+        write.close();
+
+        secondChecker.accept(result, tableSchemas);
+    }
+
+    protected static void checkFilterRowCount(
+            FileStoreTable table, int index, int value, long expectedRowCount) 
{
+        PredicateBuilder builder = new 
PredicateBuilder(table.schema().logicalRowType());
+        DataTableScan.DataFilePlan plan =
+                table.newScan().withFilter(builder.equal(index, value)).plan();
+        checkFilterRowCount(plan, expectedRowCount);
+    }
+
+    protected static void checkFilterRowCount(
+            DataTableScan.DataFilePlan plan, long expectedRowCount) {
+        List<DataFileMeta> fileMetaList =
+                plan.splits.stream().flatMap(s -> 
s.files().stream()).collect(Collectors.toList());
+        checkFilterRowCount(fileMetaList, expectedRowCount);
+    }
+
+    protected static void checkFilterRowCount(
+            List<DataFileMeta> fileMetaList, long expectedRowCount) {
+        
assertThat(fileMetaList.stream().mapToLong(DataFileMeta::rowCount).sum())
+                .isEqualTo(expectedRowCount);
+    }
+
+    /** {@link SchemaManager} subclass for testing. */
+    protected static class TestingSchemaManager extends SchemaManager {
+        private final Map<Long, TableSchema> tableSchemas;
+
+        public TestingSchemaManager(Path tableRoot, Map<Long, TableSchema> 
tableSchemas) {
+            super(tableRoot);
+            this.tableSchemas = tableSchemas;
+        }
+
+        @Override
+        public Optional<TableSchema> latest() {
+            return Optional.of(
+                    tableSchemas.get(
+                            tableSchemas.keySet().stream()
+                                    .max(Long::compareTo)
+                                    .orElseThrow(IllegalStateException::new)));
+        }
+
+        @Override
+        public List<TableSchema> listAll() {
+            return new ArrayList<>(tableSchemas.values());
+        }
+
+        @Override
+        public List<Long> listAllIds() {
+            return new ArrayList<>(tableSchemas.keySet());
+        }
+
+        @Override
+        public TableSchema commitNewVersion(UpdateSchema updateSchema) throws 
Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public TableSchema commitChanges(List<SchemaChange> changes) throws 
Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public TableSchema schema(long id) {
+            return checkNotNull(tableSchemas.get(id));
+        }
+    }
+}

Reply via email to