This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new aa7ff79  [FLINK-26346] Add statistics collecting to sst files
aa7ff79 is described below

commit aa7ff797e367df5f63ff8e8b38b6fe0e6f4c9206
Author: tsreaper <[email protected]>
AuthorDate: Wed Mar 9 11:05:42 2022 +0800

    [FLINK-26346] Add statistics collecting to sst files
    
    This closes #33
---
 .../table/store/connector/sink/TestFileStore.java  |   3 +
 .../source/FileStoreSourceSplitGeneratorTest.java  |   1 +
 .../source/FileStoreSourceSplitSerializerTest.java |   1 +
 .../store/connector/source/TestDataReadWrite.java  |   2 +-
 .../flink/table/store/file/FileStoreOptions.java   |   1 +
 .../table/store/file/{ => format}/FileFormat.java  |  26 ++-
 .../table/store/file/format/FileFormatFactory.java |  29 +++
 .../store/file/{ => format}/FileFormatImpl.java    |   2 +-
 .../table/store/file/manifest/ManifestFile.java    |   2 +-
 .../table/store/file/manifest/ManifestList.java    |   2 +-
 .../store/file/mergetree/sst/SstFileMeta.java      |  36 ++--
 .../file/mergetree/sst/SstFileMetaSerializer.java  |  17 +-
 .../store/file/mergetree/sst/SstFileReader.java    |   2 +-
 .../store/file/mergetree/sst/SstFileWriter.java    |  86 +++++++--
 .../store/file/operation/FileStoreReadImpl.java    |   2 +-
 .../store/file/operation/FileStoreWriteImpl.java   |   2 +-
 .../store/file/stats/FieldStatsCollector.java      |  11 +-
 .../table/store/file/stats/FileStatsExtractor.java |  29 +++
 .../flink/table/store/file/FileFormatTest.java     |   2 +
 .../FileStatsExtractingAvroFormat.java}            |  40 ++--
 .../FileStatsExtractingAvroFormatFactory.java      |  35 ++++
 .../FlushingFileFormat.java}                       |  23 ++-
 .../ManifestCommittableSerializerTest.java         |   3 +-
 .../store/file/manifest/ManifestFileMetaTest.java  |   3 +-
 .../store/file/manifest/ManifestFileTest.java      |  11 +-
 .../store/file/manifest/ManifestListTest.java      |   2 +-
 .../table/store/file/mergetree/LevelsTest.java     |   2 +-
 .../table/store/file/mergetree/MergeTreeTest.java  |   6 +-
 .../file/mergetree/compact/CompactManagerTest.java |  11 +-
 .../mergetree/compact/IntervalPartitionTest.java   |   1 +
 .../mergetree/compact/UniversalCompactionTest.java |   2 +-
 .../store/file/mergetree/sst/SstFileTest.java      |  60 +++---
 .../file/mergetree/sst/SstTestDataGenerator.java   |  11 +-
 .../file/stats/FileStatsExtractorTestBase.java     | 209 +++++++++++++++++++++
 .../table/store/file/stats/StatsTestUtils.java     |   9 +-
 .../store/file/stats/TestFileStatsExtractor.java   |  75 ++++++++
 ...flink.table.store.file.format.FileFormatFactory |  16 ++
 flink-table-store-orc/pom.xml                      | 116 ++++++++++++
 .../flink/table/store/orc/OrcFileFormat.java       |  51 ++---
 .../table/store/orc/OrcFileFormatFactory.java      |  37 ++++
 .../table/store/orc/OrcFileStatsExtractor.java     | 191 +++++++++++++++++++
 ...flink.table.store.file.format.FileFormatFactory |  16 ++
 .../table/store/orc/OrcFileStatsExtractorTest.java |  68 +++++++
 pom.xml                                            |   2 +
 44 files changed, 1096 insertions(+), 160 deletions(-)

diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index db7a3de..6b38312 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -185,6 +185,9 @@ public class TestFileStore implements FileStore {
                                                     null,
                                                     null,
                                                     new FieldStats[] {
+                                                        new FieldStats(null, 
null, 0)
+                                                    },
+                                                    new FieldStats[] {
                                                         new FieldStats(null, 
null, 0),
                                                         new FieldStats(null, 
null, 0),
                                                         new FieldStats(null, 
null, 0)
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 355ddea..2df9a9f 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -111,6 +111,7 @@ public class FileStoreSourceSplitGeneratorTest {
                         null, // not used
                         null, // not used
                         new FieldStats[] {new FieldStats(null, null, 0)}, // 
not used
+                        new FieldStats[] {new FieldStats(null, null, 0)}, // 
not used
                         0, // not used
                         0, // not used
                         0 // not used
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
index 3772b90..0166f8f 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
@@ -81,6 +81,7 @@ public class FileStoreSourceSplitSerializerTest {
                 row(0),
                 row(0),
                 new FieldStats[] {new FieldStats(null, null, 0)},
+                new FieldStats[] {new FieldStats(null, null, 0)},
                 0,
                 1,
                 level);
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
index aba6f5f..0f74f19 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
@@ -24,8 +24,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index 0233e2d..bf1b0e1 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 
 import java.io.Serializable;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormat.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java
similarity index 83%
rename from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormat.java
rename to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java
index 4309284..a403ac2 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormat.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file;
+package org.apache.flink.table.store.file.format;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.configuration.ConfigOption;
@@ -28,10 +28,13 @@ import 
org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
+import java.util.ServiceLoader;
 
 /** Factory class which creates reader and writer factories for specific file 
format. */
 public interface FileFormat {
@@ -62,14 +65,12 @@ public interface FileFormat {
         return createReaderFactory(rowType, projection, new ArrayList<>());
     }
 
-    /** Create a {@link FileFormatImpl} from format identifier and format 
options. */
-    static FileFormatImpl fromIdentifier(
-            ClassLoader classLoader, String formatIdentifier, ReadableConfig 
formatOptions) {
-        return new FileFormatImpl(classLoader, formatIdentifier, 
formatOptions);
+    default Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
+        return Optional.empty();
     }
 
     /** Create a {@link FileFormatImpl} from table options. */
-    static FileFormatImpl fromTableOptions(
+    static FileFormat fromTableOptions(
             ClassLoader classLoader,
             Configuration tableOptions,
             ConfigOption<String> formatOption) {
@@ -78,4 +79,17 @@ public interface FileFormat {
         ReadableConfig formatOptions = new 
DelegatingConfiguration(tableOptions, formatPrefix);
         return fromIdentifier(classLoader, formatIdentifier, formatOptions);
     }
+
+    /** Create a {@link FileFormatImpl} from format identifier and format 
options. */
+    static FileFormat fromIdentifier(
+            ClassLoader classLoader, String formatIdentifier, ReadableConfig 
formatOptions) {
+        ServiceLoader<FileFormatFactory> serviceLoader =
+                ServiceLoader.load(FileFormatFactory.class);
+        for (FileFormatFactory factory : serviceLoader) {
+            if (factory.identifier().equals(formatIdentifier.toLowerCase())) {
+                return factory.create(classLoader, formatOptions);
+            }
+        }
+        return new FileFormatImpl(classLoader, formatIdentifier, 
formatOptions);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
new file mode 100644
index 0000000..58204df
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.format;
+
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Factory to create {@link FileFormat}. */
+public interface FileFormatFactory {
+
+    String identifier();
+
+    FileFormat create(ClassLoader classLoader, ReadableConfig formatOptions);
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormatImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java
similarity index 99%
rename from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormatImpl.java
rename to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java
index cdcdd84..1846e04 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormatImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file;
+package org.apache.flink.table.store.file.format;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index cbc1571..49f69fe 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -25,7 +25,7 @@ import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.stats.FieldStatsCollector;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
index bde36a9..e15f546 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
@@ -25,7 +25,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.types.logical.RowType;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
index da419e8..3a8db5e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
@@ -42,7 +42,8 @@ public class SstFileMeta {
 
     private final BinaryRowData minKey;
     private final BinaryRowData maxKey;
-    private final FieldStats[] stats;
+    private final FieldStats[] keyStats;
+    private final FieldStats[] valueStats;
 
     private final long minSequenceNumber;
     private final long maxSequenceNumber;
@@ -54,7 +55,8 @@ public class SstFileMeta {
             long rowCount,
             BinaryRowData minKey,
             BinaryRowData maxKey,
-            FieldStats[] stats,
+            FieldStats[] keyStats,
+            FieldStats[] valueStats,
             long minSequenceNumber,
             long maxSequenceNumber,
             int level) {
@@ -64,7 +66,8 @@ public class SstFileMeta {
 
         this.minKey = minKey;
         this.maxKey = maxKey;
-        this.stats = stats;
+        this.keyStats = keyStats;
+        this.valueStats = valueStats;
 
         this.minSequenceNumber = minSequenceNumber;
         this.maxSequenceNumber = maxSequenceNumber;
@@ -91,9 +94,12 @@ public class SstFileMeta {
         return maxKey;
     }
 
-    /** Element in the array may be null, indicating the statistics of this 
field is unknown. */
-    public FieldStats[] stats() {
-        return stats;
+    public FieldStats[] keyStats() {
+        return keyStats;
+    }
+
+    public FieldStats[] valueStats() {
+        return valueStats;
     }
 
     public long minSequenceNumber() {
@@ -116,7 +122,8 @@ public class SstFileMeta {
                 rowCount,
                 minKey,
                 maxKey,
-                stats,
+                keyStats,
+                valueStats,
                 minSequenceNumber,
                 maxSequenceNumber,
                 newLevel);
@@ -133,7 +140,8 @@ public class SstFileMeta {
                 && rowCount == that.rowCount
                 && Objects.equals(minKey, that.minKey)
                 && Objects.equals(maxKey, that.maxKey)
-                && Arrays.equals(stats, that.stats)
+                && Arrays.equals(keyStats, that.keyStats)
+                && Arrays.equals(valueStats, that.valueStats)
                 && minSequenceNumber == that.minSequenceNumber
                 && maxSequenceNumber == that.maxSequenceNumber
                 && level == that.level;
@@ -149,7 +157,8 @@ public class SstFileMeta {
                 maxKey,
                 // by default, hash code of arrays are computed by reference, 
not by content.
                 // so we must use Arrays.hashCode to hash by content.
-                Arrays.hashCode(stats),
+                Arrays.hashCode(keyStats),
+                Arrays.hashCode(valueStats),
                 minSequenceNumber,
                 maxSequenceNumber,
                 level);
@@ -158,13 +167,14 @@ public class SstFileMeta {
     @Override
     public String toString() {
         return String.format(
-                "{%s, %d, %d, %s, %s, %s, %d, %d, %d}",
+                "{%s, %d, %d, %s, %s, %s, %s, %d, %d, %d}",
                 fileName,
                 fileSize,
                 rowCount,
                 minKey,
                 maxKey,
-                Arrays.toString(stats),
+                Arrays.toString(keyStats),
+                Arrays.toString(valueStats),
                 minSequenceNumber,
                 maxSequenceNumber,
                 level);
@@ -177,7 +187,9 @@ public class SstFileMeta {
         fields.add(new RowType.RowField("_ROW_COUNT", new BigIntType(false)));
         fields.add(new RowType.RowField("_MIN_KEY", keyType));
         fields.add(new RowType.RowField("_MAX_KEY", keyType));
-        fields.add(new RowType.RowField("_STATS", 
FieldStatsArraySerializer.schema(valueType)));
+        fields.add(new RowType.RowField("_KEY_STATS", 
FieldStatsArraySerializer.schema(keyType)));
+        fields.add(
+                new RowType.RowField("_VALUE_STATS", 
FieldStatsArraySerializer.schema(valueType)));
         fields.add(new RowType.RowField("_MIN_SEQUENCE_NUMBER", new 
BigIntType(false)));
         fields.add(new RowType.RowField("_MAX_SEQUENCE_NUMBER", new 
BigIntType(false)));
         fields.add(new RowType.RowField("_LEVEL", new IntType(false)));
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
index d03a71e..9e4dbe6 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
@@ -32,12 +32,14 @@ public class SstFileMetaSerializer extends 
ObjectSerializer<SstFileMeta> {
     private static final long serialVersionUID = 1L;
 
     private final RowDataSerializer keySerializer;
-    private final FieldStatsArraySerializer statsArraySerializer;
+    private final FieldStatsArraySerializer keyStatsArraySerializer;
+    private final FieldStatsArraySerializer valueStatsArraySerializer;
 
     public SstFileMetaSerializer(RowType keyType, RowType valueType) {
         super(SstFileMeta.schema(keyType, valueType));
         this.keySerializer = new RowDataSerializer(keyType);
-        this.statsArraySerializer = new FieldStatsArraySerializer(valueType);
+        this.keyStatsArraySerializer = new FieldStatsArraySerializer(keyType);
+        this.valueStatsArraySerializer = new 
FieldStatsArraySerializer(valueType);
     }
 
     @Override
@@ -48,7 +50,8 @@ public class SstFileMetaSerializer extends 
ObjectSerializer<SstFileMeta> {
                 meta.rowCount(),
                 meta.minKey(),
                 meta.maxKey(),
-                statsArraySerializer.toRow(meta.stats()),
+                keyStatsArraySerializer.toRow(meta.keyStats()),
+                valueStatsArraySerializer.toRow(meta.valueStats()),
                 meta.minSequenceNumber(),
                 meta.maxSequenceNumber(),
                 meta.level());
@@ -63,9 +66,11 @@ public class SstFileMetaSerializer extends 
ObjectSerializer<SstFileMeta> {
                 row.getLong(2),
                 keySerializer.toBinaryRow(row.getRow(3, keyFieldCount)).copy(),
                 keySerializer.toBinaryRow(row.getRow(4, keyFieldCount)).copy(),
-                statsArraySerializer.fromRow(row.getRow(5, 
statsArraySerializer.numFields())),
-                row.getLong(6),
+                keyStatsArraySerializer.fromRow(row.getRow(5, 
keyStatsArraySerializer.numFields())),
+                valueStatsArraySerializer.fromRow(
+                        row.getRow(6, valueStatsArraySerializer.numFields())),
                 row.getLong(7),
-                row.getInt(8));
+                row.getLong(8),
+                row.getInt(9));
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
index f9ce7f3..b3c0a59 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
@@ -25,9 +25,9 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializer;
+import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.RecordReader;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
index 5c18aa6..afc7e19 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
@@ -25,10 +25,12 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
-import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializer;
+import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.RollingFile;
@@ -40,6 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /** Writes {@link KeyValue}s into sst files. */
@@ -50,6 +53,7 @@ public class SstFileWriter {
     private final RowType keyType;
     private final RowType valueType;
     private final BulkWriter.Factory<RowData> writerFactory;
+    private final FileStatsExtractor fileStatsExtractor;
     private final SstPathFactory pathFactory;
     private final long suggestedFileSize;
 
@@ -57,11 +61,13 @@ public class SstFileWriter {
             RowType keyType,
             RowType valueType,
             BulkWriter.Factory<RowData> writerFactory,
+            FileStatsExtractor fileStatsExtractor,
             SstPathFactory pathFactory,
             long suggestedFileSize) {
         this.keyType = keyType;
         this.valueType = valueType;
         this.writerFactory = writerFactory;
+        this.fileStatsExtractor = fileStatsExtractor;
         this.pathFactory = pathFactory;
         this.suggestedFileSize = suggestedFileSize;
     }
@@ -91,7 +97,10 @@ public class SstFileWriter {
      */
     public List<SstFileMeta> write(CloseableIterator<KeyValue> iterator, int 
level)
             throws Exception {
-        SstRollingFile rollingFile = new StatsCollectingRollingFile(level);
+        SstRollingFile rollingFile =
+                fileStatsExtractor == null
+                        ? new StatsCollectingRollingFile(level)
+                        : new FileExtractingRollingFile(level);
         List<SstFileMeta> result = new ArrayList<>();
         List<Path> filesToCleanUp = new ArrayList<>();
         try {
@@ -161,6 +170,7 @@ public class SstFileWriter {
 
         @Override
         protected SstFileMeta collectFile(Path path) throws IOException {
+            KeyAndValueStats stats = extractStats(path);
             SstFileMeta result =
                     new SstFileMeta(
                             path.getName(),
@@ -168,7 +178,8 @@ public class SstFileWriter {
                             rowCount,
                             minKey,
                             keySerializer.toBinaryRow(maxKey).copy(),
-                            collectStats(path),
+                            stats.keyStats,
+                            stats.valueStats,
                             minSequenceNumber,
                             maxSequenceNumber,
                             level);
@@ -176,7 +187,7 @@ public class SstFileWriter {
             return result;
         }
 
-        private void resetMeta() {
+        protected void resetMeta() {
             rowCount = 0;
             minKey = null;
             maxKey = null;
@@ -184,26 +195,68 @@ public class SstFileWriter {
             maxSequenceNumber = Long.MIN_VALUE;
         }
 
-        protected abstract FieldStats[] collectStats(Path path);
+        protected abstract KeyAndValueStats extractStats(Path path);
+    }
+
+    private class FileExtractingRollingFile extends SstRollingFile {
+
+        private FileExtractingRollingFile(int level) {
+            super(level);
+        }
+
+        @Override
+        protected KeyAndValueStats extractStats(Path path) {
+            FieldStats[] rawStats;
+            try {
+                rawStats = fileStatsExtractor.extract(path);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+            int numKeyFields = keyType.getFieldCount();
+            return new KeyAndValueStats(
+                    Arrays.copyOfRange(rawStats, 0, numKeyFields),
+                    Arrays.copyOfRange(rawStats, numKeyFields + 2, 
rawStats.length));
+        }
     }
 
     private class StatsCollectingRollingFile extends SstRollingFile {
 
+        private FieldStatsCollector keyStatsCollector;
+        private FieldStatsCollector valueStatsCollector;
+
         private StatsCollectingRollingFile(int level) {
             super(level);
         }
 
         @Override
-        protected FieldStats[] collectStats(Path path) {
-            // TODO
-            //  1. Read statistics directly from the written orc/parquet files.
-            //  2. For other file formats use StatsCollector. Make sure fields 
are not reused
-            //     otherwise we need copying.
-            FieldStats[] stats = new FieldStats[valueType.getFieldCount()];
-            for (int i = 0; i < stats.length; i++) {
-                stats[i] = new FieldStats(null, null, 0);
-            }
-            return stats;
+        protected RowData toRowData(KeyValue kv) {
+            keyStatsCollector.collect(kv.key());
+            valueStatsCollector.collect(kv.value());
+            return super.toRowData(kv);
+        }
+
+        @Override
+        protected KeyAndValueStats extractStats(Path path) {
+            return new KeyAndValueStats(keyStatsCollector.extract(), 
valueStatsCollector.extract());
+        }
+
+        @Override
+        protected void resetMeta() {
+            super.resetMeta();
+            keyStatsCollector = new FieldStatsCollector(keyType);
+            valueStatsCollector = new FieldStatsCollector(valueType);
+        }
+    }
+
+    private static class KeyAndValueStats {
+
+        private final FieldStats[] keyStats;
+        private final FieldStats[] valueStats;
+
+        private KeyAndValueStats(FieldStats[] keyStats, FieldStats[] 
valueStats) {
+            this.keyStats = keyStats;
+            this.valueStats = valueStats;
         }
     }
 
@@ -213,6 +266,7 @@ public class SstFileWriter {
         private final RowType keyType;
         private final RowType valueType;
         private final BulkWriter.Factory<RowData> writerFactory;
+        private final FileStatsExtractor fileStatsExtractor;
         private final FileStorePathFactory pathFactory;
         private final long suggestedFileSize;
 
@@ -226,6 +280,7 @@ public class SstFileWriter {
             this.valueType = valueType;
             RowType recordType = KeyValue.schema(keyType, valueType);
             this.writerFactory = fileFormat.createWriterFactory(recordType);
+            this.fileStatsExtractor = 
fileFormat.createStatsExtractor(recordType).orElse(null);
             this.pathFactory = pathFactory;
             this.suggestedFileSize = suggestedFileSize;
         }
@@ -235,6 +290,7 @@ public class SstFileWriter {
                     keyType,
                     valueType,
                     writerFactory,
+                    fileStatsExtractor,
                     pathFactory.createSstPathFactory(partition, bucket),
                     suggestedFileSize);
         }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
index 0df5aa7..2d69773 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
 import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
index 7bee391..441068a 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.mergetree.Levels;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
index 89ed0f9..8acb126 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.table.store.file.stats;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -29,6 +31,7 @@ public class FieldStatsCollector {
     private final Object[] maxValues;
     private final long[] nullCounts;
     private final RowDataToObjectArrayConverter converter;
+    private final TypeSerializer<Object>[] fieldSerializers;
 
     public FieldStatsCollector(RowType rowType) {
         int numFields = rowType.getFieldCount();
@@ -36,6 +39,10 @@ public class FieldStatsCollector {
         this.maxValues = new Object[numFields];
         this.nullCounts = new long[numFields];
         this.converter = new RowDataToObjectArrayConverter(rowType);
+        this.fieldSerializers = new TypeSerializer[numFields];
+        for (int i = 0; i < numFields; i++) {
+            fieldSerializers[i] = 
InternalSerializers.create(rowType.getTypeAt(i));
+        }
     }
 
     /**
@@ -59,10 +66,10 @@ public class FieldStatsCollector {
             }
             Comparable<Object> c = (Comparable<Object>) obj;
             if (minValues[i] == null || c.compareTo(minValues[i]) < 0) {
-                minValues[i] = c;
+                minValues[i] = fieldSerializers[i].copy(c);
             }
             if (maxValues[i] == null || c.compareTo(maxValues[i]) > 0) {
-                maxValues[i] = c;
+                maxValues[i] = fieldSerializers[i].copy(c);
             }
         }
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FileStatsExtractor.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FileStatsExtractor.java
new file mode 100644
index 0000000..3f56d3b
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FileStatsExtractor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+
+/** Extracts statistics directly from file. */
+public interface FileStatsExtractor {
+
+    FieldStats[] extract(Path path) throws IOException;
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
index 32eb230..9102e6f 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.format.FileFormatImpl;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
similarity index 59%
copy from 
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
copy to 
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
index a6ff06b..77e9807 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.utils;
+package org.apache.flink.table.store.file.format;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.configuration.Configuration;
@@ -24,19 +24,21 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.store.file.FileFormat;
-import org.apache.flink.table.store.file.mergetree.sst.SstFileTest;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.store.file.stats.TestFileStatsExtractor;
 import org.apache.flink.table.types.logical.RowType;
 
-import java.io.IOException;
 import java.util.List;
+import java.util.Optional;
 
-/** A special avro {@link FileFormat} which flushes for every added element. */
-public class FlushingAvroFormat implements FileFormat {
+/** An avro {@link FileFormat} for test. It provides a {@link 
FileStatsExtractor}. */
+public class FileStatsExtractingAvroFormat implements FileFormat {
 
     private final FileFormat avro =
             FileFormat.fromIdentifier(
-                    SstFileTest.class.getClassLoader(), "avro", new 
Configuration());
+                    FileStatsExtractingAvroFormat.class.getClassLoader(),
+                    "avro",
+                    new Configuration());
 
     @Override
     public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
@@ -46,25 +48,11 @@ public class FlushingAvroFormat implements FileFormat {
 
     @Override
     public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
-        return fsDataOutputStream -> {
-            BulkWriter<RowData> wrapped = 
avro.createWriterFactory(type).create(fsDataOutputStream);
-            return new BulkWriter<RowData>() {
-                @Override
-                public void addElement(RowData rowData) throws IOException {
-                    wrapped.addElement(rowData);
-                    wrapped.flush();
-                }
-
-                @Override
-                public void flush() throws IOException {
-                    wrapped.flush();
-                }
+        return avro.createWriterFactory(type);
+    }
 
-                @Override
-                public void finish() throws IOException {
-                    wrapped.finish();
-                }
-            };
-        };
+    @Override
+    public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
+        return Optional.of(new TestFileStatsExtractor(avro, type));
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
new file mode 100644
index 0000000..1b8fdef
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.format;
+
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Factory to create {@link FileStatsExtractingAvroFormat}. */
+public class FileStatsExtractingAvroFormatFactory implements FileFormatFactory 
{
+
+    @Override
+    public String identifier() {
+        return "avro-extract";
+    }
+
+    @Override
+    public FileFormat create(ClassLoader classLoader, ReadableConfig 
formatOptions) {
+        return new FileStatsExtractingAvroFormat();
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
similarity index 76%
copy from 
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
copy to 
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
index a6ff06b..fb8f90c 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.utils;
+package org.apache.flink.table.store.file.format;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.configuration.Configuration;
@@ -24,30 +24,33 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.store.file.FileFormat;
-import org.apache.flink.table.store.file.mergetree.sst.SstFileTest;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.IOException;
 import java.util.List;
 
-/** A special avro {@link FileFormat} which flushes for every added element. */
-public class FlushingAvroFormat implements FileFormat {
+/** A special {@link FileFormat} which flushes for every added element. */
+public class FlushingFileFormat implements FileFormat {
 
-    private final FileFormat avro =
-            FileFormat.fromIdentifier(
-                    SstFileTest.class.getClassLoader(), "avro", new 
Configuration());
+    private final FileFormat format;
+
+    public FlushingFileFormat(String identifier) {
+        this.format =
+                FileFormat.fromIdentifier(
+                        FlushingFileFormat.class.getClassLoader(), identifier, 
new Configuration());
+    }
 
     @Override
     public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
             RowType type, int[][] projection, List<ResolvedExpression> 
filters) {
-        return avro.createReaderFactory(type, projection, filters);
+        return format.createReaderFactory(type, projection, filters);
     }
 
     @Override
     public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
         return fsDataOutputStream -> {
-            BulkWriter<RowData> wrapped = 
avro.createWriterFactory(type).create(fsDataOutputStream);
+            BulkWriter<RowData> wrapped =
+                    
format.createWriterFactory(type).create(fsDataOutputStream);
             return new BulkWriter<RowData>() {
                 @Override
                 public void addElement(RowData rowData) throws IOException {
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
index 10b06c3..8f8bd8c 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
@@ -90,6 +90,7 @@ public class ManifestCommittableSerializerTest {
 
     public static SstFileMeta newFile(int name, int level) {
         FieldStats[] stats = new FieldStats[] {new FieldStats(0, 1, 0)};
-        return new SstFileMeta(String.valueOf(name), 0, 1, row(0), row(0), 
stats, 0, 1, level);
+        return new SstFileMeta(
+                String.valueOf(name), 0, 1, row(0), row(0), stats, stats, 0, 
1, level);
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index 9296a31..51084d0 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.writer.BinaryRowWriter;
-import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
 import org.apache.flink.table.store.file.stats.FieldStats;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
@@ -240,6 +240,7 @@ public class ManifestFileMetaTest {
                         binaryRowData, // not used
                         binaryRowData, // not used
                         new FieldStats[] {new FieldStats(null, null, 0)}, // 
not used
+                        new FieldStats[] {new FieldStats(null, null, 0)}, // 
not used
                         0, // not used
                         0, // not used
                         0 // not used
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index f1c8c78..5087125 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -21,9 +21,8 @@ package org.apache.flink.table.store.file.manifest;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -124,11 +123,9 @@ public class ManifestFileTest {
 
         // check stats
         for (int i = 0; i < expected.partitionStats().length; i++) {
-            List<FieldStats> actualStats = new ArrayList<>();
-            for (ManifestFileMeta meta : actual) {
-                actualStats.add(meta.partitionStats()[i]);
-            }
-            StatsTestUtils.checkRollingFileStats(expected.partitionStats()[i], 
actualStats);
+            int idx = i;
+            StatsTestUtils.checkRollingFileStats(
+                    expected.partitionStats()[i], actual, meta -> 
meta.partitionStats()[idx]);
         }
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
index 39f2529..a011454 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.table.store.file.manifest;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
index 3f6a89b..d2a2059 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
@@ -62,6 +62,6 @@ public class LevelsTest {
     }
 
     public static SstFileMeta newFile(int level) {
-        return new SstFileMeta("", 0, 1, row(0), row(0), null, 0, 1, level);
+        return new SstFileMeta("", 0, 1, row(0), row(0), null, null, 0, 1, 
level);
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index c6b86b9..18b84cc 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -25,9 +25,10 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowDataUtil;
-import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.format.FlushingFileFormat;
 import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
 import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
 import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
@@ -37,7 +38,6 @@ import 
org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileReader;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileWriter;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.FlushingAvroFormat;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.RecordWriter;
@@ -99,7 +99,7 @@ public class MergeTreeTest {
         options = new MergeTreeOptions(configuration);
         RowType keyType = new RowType(singletonList(new RowType.RowField("k", 
new IntType())));
         RowType valueType = new RowType(singletonList(new 
RowType.RowField("v", new IntType())));
-        FileFormat flushingAvro = new FlushingAvroFormat();
+        FileFormat flushingAvro = new FlushingFileFormat("avro");
         sstFileReader =
                 new SstFileReader.Factory(keyType, valueType, flushingAvro, 
pathFactory)
                         .create(BinaryRowDataUtil.EMPTY_ROW, 0);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
index 49c3ca5..98687e4 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
@@ -205,7 +205,16 @@ public class CompactManagerTest {
 
     private static SstFileMeta newFile(int level, int minKey, int maxKey, long 
maxSequence) {
         return new SstFileMeta(
-                "", maxKey - minKey + 1, 1, row(minKey), row(maxKey), null, 0, 
maxSequence, level);
+                "",
+                maxKey - minKey + 1,
+                1,
+                row(minKey),
+                row(maxKey),
+                null,
+                null,
+                0,
+                maxSequence,
+                level);
     }
 
     public static BinaryRowData row(int i) {
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
index f59c1c2..db56649 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
@@ -172,6 +172,7 @@ public class IntervalPartitionTest {
                 minKey,
                 maxKey,
                 new FieldStats[] {new FieldStats(left, right, 0)},
+                new FieldStats[] {new FieldStats(null, null, 0)}, // not used
                 0,
                 24,
                 0);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
index 17ae39a..c7678d7 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
@@ -223,6 +223,6 @@ public class UniversalCompactionTest {
     }
 
     private SstFileMeta file(long size) {
-        return new SstFileMeta("", size, 1, null, null, null, 0, 0, 0);
+        return new SstFileMeta("", size, 1, null, null, null, null, 0, 0, 0);
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
index 00f44e7..3e5acd6 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
@@ -24,13 +24,13 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.binary.BinaryRowDataUtil;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
-import org.apache.flink.table.store.file.FileFormat;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializerTest;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.format.FlushingFileFormat;
+import org.apache.flink.table.store.file.stats.StatsTestUtils;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.FlushingAvroFormat;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
@@ -56,14 +56,22 @@ public class SstFileTest {
 
     private final SstTestDataGenerator gen =
             SstTestDataGenerator.builder().memTableCapacity(20).build();
-    private final FileFormat flushingAvro = new FlushingAvroFormat();
 
     @TempDir java.nio.file.Path tempDir;
 
     @RepeatedTest(10)
-    public void testWriteAndReadSstFile() throws Exception {
+    public void testWriteAndReadSstFileWithStatsCollectingRollingFile() throws 
Exception {
+        testWriteAndReadSstFileImpl("avro");
+    }
+
+    @RepeatedTest(10)
+    public void testWriteAndReadSstFileWithFileExtractingRollingFile() throws 
Exception {
+        testWriteAndReadSstFileImpl("avro-extract");
+    }
+
+    private void testWriteAndReadSstFileImpl(String format) throws Exception {
         SstTestDataGenerator.Data data = gen.next();
-        SstFileWriter writer = createSstFileWriter(tempDir.toString());
+        SstFileWriter writer = createSstFileWriter(tempDir.toString(), format);
         SstFileMetaSerializer serializer =
                 new SstFileMetaSerializer(
                         TestKeyValueGenerator.KEY_TYPE, 
TestKeyValueGenerator.ROW_TYPE);
@@ -73,7 +81,7 @@ public class SstFileTest {
 
         checkRollingFiles(data.meta, actualMetas, writer.suggestedFileSize());
 
-        SstFileReader reader = createSstFileReader(tempDir.toString(), null, 
null);
+        SstFileReader reader = createSstFileReader(tempDir.toString(), format, 
null, null);
         assertData(
                 data,
                 actualMetas,
@@ -90,7 +98,7 @@ public class SstFileTest {
         SstTestDataGenerator.Data data = gen.next();
         SstFileWriter writer =
                 createSstFileWriter(
-                        
FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
+                        
FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()), "avro");
 
         try {
             writer.write(CloseableIterator.fromList(data.content, kv -> {}), 
0);
@@ -109,7 +117,7 @@ public class SstFileTest {
     @Test
     public void testKeyProjection() throws Exception {
         SstTestDataGenerator.Data data = gen.next();
-        SstFileWriter sstFileWriter = createSstFileWriter(tempDir.toString());
+        SstFileWriter sstFileWriter = createSstFileWriter(tempDir.toString(), 
"avro");
         SstFileMetaSerializer serializer =
                 new SstFileMetaSerializer(
                         TestKeyValueGenerator.KEY_TYPE, 
TestKeyValueGenerator.ROW_TYPE);
@@ -118,7 +126,7 @@ public class SstFileTest {
 
         // projection: (shopId, orderId) -> (orderId)
         SstFileReader sstFileReader =
-                createSstFileReader(tempDir.toString(), new int[][] {new int[] 
{1}}, null);
+                createSstFileReader(tempDir.toString(), "avro", new int[][] 
{new int[] {1}}, null);
         RowType projectedKeyType =
                 RowType.of(new LogicalType[] {new BigIntType(false)}, new 
String[] {"key_orderId"});
         RowDataSerializer projectedKeySerializer = new 
RowDataSerializer(projectedKeyType);
@@ -141,7 +149,7 @@ public class SstFileTest {
     @Test
     public void testValueProjection() throws Exception {
         SstTestDataGenerator.Data data = gen.next();
-        SstFileWriter sstFileWriter = createSstFileWriter(tempDir.toString());
+        SstFileWriter sstFileWriter = createSstFileWriter(tempDir.toString(), 
"avro");
         SstFileMetaSerializer serializer =
                 new SstFileMetaSerializer(
                         TestKeyValueGenerator.KEY_TYPE, 
TestKeyValueGenerator.ROW_TYPE);
@@ -154,6 +162,7 @@ public class SstFileTest {
         SstFileReader sstFileReader =
                 createSstFileReader(
                         tempDir.toString(),
+                        "avro",
                         null,
                         new int[][] {new int[] {2}, new int[] {4}, new int[] 
{0}, new int[] {1}});
         RowType projectedValueType =
@@ -188,29 +197,29 @@ public class SstFileTest {
                                                 kv.value().getInt(1))));
     }
 
-    private SstFileWriter createSstFileWriter(String path) {
+    private SstFileWriter createSstFileWriter(String path, String format) {
         FileStorePathFactory pathFactory = new FileStorePathFactory(new 
Path(path));
         int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 
1024;
         return new SstFileWriter.Factory(
                         TestKeyValueGenerator.KEY_TYPE,
                         TestKeyValueGenerator.ROW_TYPE,
-                        // normal avro format will buffer changes in memory 
and we can't determine
+                        // normal format will buffer changes in memory and we 
can't determine
                         // if the written file size is really larger than 
suggested, so we use a
-                        // special avro format which flushes for every added 
element
-                        flushingAvro,
+                        // special format which flushes for every added element
+                        new FlushingFileFormat(format),
                         pathFactory,
                         suggestedFileSize)
                 .create(BinaryRowDataUtil.EMPTY_ROW, 0);
     }
 
     private SstFileReader createSstFileReader(
-            String path, int[][] keyProjection, int[][] valueProjection) {
+            String path, String format, int[][] keyProjection, int[][] 
valueProjection) {
         FileStorePathFactory pathFactory = new FileStorePathFactory(new 
Path(path));
         SstFileReader.Factory factory =
                 new SstFileReader.Factory(
                         TestKeyValueGenerator.KEY_TYPE,
                         TestKeyValueGenerator.ROW_TYPE,
-                        flushingAvro,
+                        new FlushingFileFormat(format),
                         pathFactory);
         if (keyProjection != null) {
             factory.withKeyProjection(keyProjection);
@@ -272,16 +281,17 @@ public class SstFileTest {
         // expected.maxKey == lastFile.maxKey
         assertThat(actual.get(actual.size() - 
1).maxKey()).isEqualTo(expected.maxKey());
 
-        // TODO check stats after they're collected
-        /*
-        for (int i = 0; i < expected.stats().length; i++) {
-            List<FieldStats> actualStats = new ArrayList<>();
-            for (SstFileMeta meta : actual) {
-                actualStats.add(meta.stats()[i]);
-            }
-            checkRollingFileStats(expected.stats()[i], actualStats);
+        // check stats
+        for (int i = 0; i < expected.keyStats().length; i++) {
+            int idx = i;
+            StatsTestUtils.checkRollingFileStats(
+                    expected.keyStats()[i], actual, m -> m.keyStats()[idx]);
+        }
+        for (int i = 0; i < expected.valueStats().length; i++) {
+            int idx = i;
+            StatsTestUtils.checkRollingFileStats(
+                    expected.valueStats()[i], actual, m -> 
m.valueStats()[idx]);
         }
-        */
 
         // expected.minSequenceNumber == min(minSequenceNumber)
         
assertThat(actual.stream().mapToLong(SstFileMeta::minSequenceNumber).min().orElse(-1))
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
index bb39d7d..5d5edc4 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
@@ -98,7 +98,10 @@ public class SstTestDataGenerator {
     }
 
     private Data createSstFile(List<KeyValue> kvs, int level, BinaryRowData 
partition, int bucket) {
-        FieldStatsCollector collector = new 
FieldStatsCollector(TestKeyValueGenerator.ROW_TYPE);
+        FieldStatsCollector keyStatsCollector =
+                new FieldStatsCollector(TestKeyValueGenerator.KEY_TYPE);
+        FieldStatsCollector valueStatsCollector =
+                new FieldStatsCollector(TestKeyValueGenerator.ROW_TYPE);
         long totalSize = 0;
         BinaryRowData minKey = null;
         BinaryRowData maxKey = null;
@@ -108,7 +111,8 @@ public class SstTestDataGenerator {
             BinaryRowData key = (BinaryRowData) kv.key();
             BinaryRowData value = (BinaryRowData) kv.value();
             totalSize += key.getSizeInBytes() + value.getSizeInBytes();
-            collector.collect(value);
+            keyStatsCollector.collect(key);
+            valueStatsCollector.collect(value);
             if (minKey == null || 
TestKeyValueGenerator.KEY_COMPARATOR.compare(key, minKey) < 0) {
                 minKey = key;
             }
@@ -128,7 +132,8 @@ public class SstTestDataGenerator {
                         kvs.size(),
                         minKey,
                         maxKey,
-                        collector.extract(),
+                        keyStatsCollector.extract(),
+                        valueStatsCollector.extract(),
                         minSequenceNumber,
                         maxSequenceNumber,
                         level),
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FileStatsExtractorTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FileStatsExtractorTestBase.java
new file mode 100644
index 0000000..b1143ec
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FileStatsExtractorTestBase.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FileStatsExtractor}. */
+public abstract class FileStatsExtractorTestBase {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testExtract() throws Exception {
+        FileFormat format = createFormat();
+        RowType rowType = rowType();
+
+        BulkWriter.Factory<RowData> writerFactory = 
format.createWriterFactory(rowType);
+        Path path = new Path(tempDir.toString() + "/test");
+        FSDataOutputStream out =
+                path.getFileSystem().create(path, 
FileSystem.WriteMode.NO_OVERWRITE);
+        BulkWriter<RowData> writer = writerFactory.create(out);
+
+        List<GenericRowData> data = createData(rowType);
+        for (GenericRowData row : data) {
+            writer.addElement(row);
+        }
+        writer.finish();
+
+        FieldStatsCollector collector = new FieldStatsCollector(rowType);
+        for (GenericRowData row : data) {
+            collector.collect(row);
+        }
+        FieldStats[] expected = collector.extract();
+
+        FileStatsExtractor extractor = 
format.createStatsExtractor(rowType).get();
+        assertThat(extractor).isNotNull();
+        FieldStats[] actual = extractor.extract(path);
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    private List<GenericRowData> createData(RowType rowType) {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int numRows = random.nextInt(100);
+        List<List<Object>> columns = new ArrayList<>();
+        for (RowType.RowField field : rowType.getFields()) {
+            List<Object> column = new ArrayList<>();
+            int numValues = random.nextInt(numRows + 1);
+            for (int i = 0; i < numValues; i++) {
+                column.add(createField(field.getType()));
+            }
+            columns.add(column);
+        }
+        return createData(numRows, columns);
+    }
+
+    private List<GenericRowData> createData(int numRows, List<List<Object>> 
columns) {
+        List<GenericRowData> rows = new ArrayList<>();
+        for (int i = 0; i < numRows; i++) {
+            rows.add(new GenericRowData(columns.size()));
+        }
+        for (int i = 0; i < columns.size(); i++) {
+            List<?> objects = new ArrayList<>(columns.get(i));
+            while (objects.size() < numRows) {
+                objects.add(null);
+            }
+            Collections.shuffle(objects);
+            for (int j = 0; j < numRows; j++) {
+                rows.get(j).setField(i, objects.get(j));
+            }
+        }
+        return rows;
+    }
+
+    private Object createField(LogicalType type) {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        switch (type.getTypeRoot()) {
+            case CHAR:
+                CharType charType = (CharType) type;
+                return 
BinaryStringData.fromString(randomString(charType.getLength()));
+            case VARCHAR:
+                VarCharType varCharType = (VarCharType) type;
+                return BinaryStringData.fromString(
+                        randomString(random.nextInt(varCharType.getLength()) + 
1));
+            case BOOLEAN:
+                return random.nextBoolean();
+            case TINYINT:
+                return (byte) random.nextInt(10);
+            case SMALLINT:
+                return (short) random.nextInt(100);
+            case INTEGER:
+                return random.nextInt(1000);
+            case BIGINT:
+                return random.nextLong(10000);
+            case FLOAT:
+                return random.nextFloat();
+            case DOUBLE:
+                return random.nextDouble();
+            case DECIMAL:
+                return randomDecimalData((DecimalType) type);
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+                return random.nextInt(10000);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return randomTimestampData((TimestampType) type);
+            case ARRAY:
+                return randomArray((ArrayType) type);
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported type " + type.asSummaryString());
+        }
+    }
+
+    private String randomString(int length) {
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < length; i++) {
+            builder.append((char) ThreadLocalRandom.current().nextInt(127 - 
32));
+        }
+        return builder.toString();
+    }
+
+    private DecimalData randomDecimalData(DecimalType type) {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int p = type.getPrecision();
+        int s = type.getScale();
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < p - s; i++) {
+            builder.append((char) (random.nextInt(10) + '0'));
+        }
+        if (s > 0) {
+            builder.append('.');
+            for (int i = 0; i < s; i++) {
+                builder.append((char) (random.nextInt(10) + '0'));
+            }
+        }
+        return DecimalData.fromBigDecimal(new BigDecimal(builder.toString()), 
p, s);
+    }
+
+    private TimestampData randomTimestampData(TimestampType type) {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        long p = 1;
+        for (int i = type.getPrecision(); i < TimestampType.MAX_PRECISION; 
i++) {
+            p *= 10;
+        }
+        long currentSecond = System.currentTimeMillis() / 1000;
+        return TimestampData.fromInstant(
+                Instant.ofEpochSecond(
+                        random.nextLong(currentSecond), 
random.nextLong(1_000_000_000) / p * p));
+    }
+
+    private ArrayData randomArray(ArrayType type) {
+        int length = ThreadLocalRandom.current().nextInt(10);
+        Object[] javaArray = new Object[length];
+        for (int i = 0; i < length; i++) {
+            javaArray[i] = createField(type.getElementType());
+        }
+        return new GenericArrayData(javaArray);
+    }
+
+    protected abstract FileFormat createFormat();
+
+    protected abstract RowType rowType();
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
index 379b600..4ce3255 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.table.store.file.stats;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -26,7 +28,12 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class StatsTestUtils {
 
     @SuppressWarnings("unchecked")
-    public static void checkRollingFileStats(FieldStats expected, 
List<FieldStats> actual) {
+    public static <T> void checkRollingFileStats(
+            FieldStats expected, List<T> actualObjects, Function<T, 
FieldStats> mapToStats) {
+        List<FieldStats> actual = new ArrayList<>();
+        for (T object : actualObjects) {
+            actual.add(mapToStats.apply(object));
+        }
         if (expected.minValue() instanceof Comparable) {
             Object actualMin = null;
             Object actualMax = null;
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/TestFileStatsExtractor.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/TestFileStatsExtractor.java
new file mode 100644
index 0000000..751ea18
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/TestFileStatsExtractor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link FileStatsExtractor} for test. It reads all records from the file and 
use {@link
+ * FieldStatsCollector} to collect the stats.
+ */
+public class TestFileStatsExtractor implements FileStatsExtractor {
+
+    private final FileFormat format;
+    private final RowType rowType;
+
+    public TestFileStatsExtractor(FileFormat format, RowType rowType) {
+        this.format = format;
+        this.rowType = rowType;
+    }
+
+    @Override
+    public FieldStats[] extract(Path path) throws IOException {
+        IdentityObjectSerializer serializer = new 
IdentityObjectSerializer(rowType);
+        BulkFormat<RowData, FileSourceSplit> readerFactory = 
format.createReaderFactory(rowType);
+        List<RowData> records = FileUtils.readListFromFile(path, serializer, 
readerFactory);
+        FieldStatsCollector statsCollector = new FieldStatsCollector(rowType);
+        for (RowData record : records) {
+            statsCollector.collect(record);
+        }
+        return statsCollector.extract();
+    }
+
+    private static class IdentityObjectSerializer extends 
ObjectSerializer<RowData> {
+
+        public IdentityObjectSerializer(RowType rowType) {
+            super(rowType);
+        }
+
+        @Override
+        public RowData toRow(RowData record) {
+            return record;
+        }
+
+        @Override
+        public RowData fromRow(RowData rowData) {
+            return rowData;
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
 
b/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
new file mode 100644
index 0000000..7bf0c99
--- /dev/null
+++ 
b/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.file.format.FileStatsExtractingAvroFormatFactory
\ No newline at end of file
diff --git a/flink-table-store-orc/pom.xml b/flink-table-store-orc/pom.xml
new file mode 100644
index 0000000..a7a6da8
--- /dev/null
+++ b/flink-table-store-orc/pom.xml
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-orc</artifactId>
+    <name>Flink Table Store : Orc</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- flink dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-orc</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-core</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
 
b/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormat.java
similarity index 50%
rename from 
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
rename to 
flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormat.java
index a6ff06b..0087cc3 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
+++ 
b/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormat.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,55 +16,44 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.utils;
+package org.apache.flink.table.store.orc;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.store.file.FileFormat;
-import org.apache.flink.table.store.file.mergetree.sst.SstFileTest;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.format.FileFormatImpl;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
 import org.apache.flink.table.types.logical.RowType;
 
-import java.io.IOException;
 import java.util.List;
+import java.util.Optional;
 
-/** A special avro {@link FileFormat} which flushes for every added element. */
-public class FlushingAvroFormat implements FileFormat {
+/** Orc {@link FileFormat}. */
+public class OrcFileFormat implements FileFormat {
 
-    private final FileFormat avro =
-            FileFormat.fromIdentifier(
-                    SstFileTest.class.getClassLoader(), "avro", new 
Configuration());
+    private final FileFormatImpl format;
+
+    public OrcFileFormat(ClassLoader classLoader, ReadableConfig 
formatOptions) {
+        this.format = new FileFormatImpl(classLoader, "orc", formatOptions);
+    }
 
     @Override
     public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
             RowType type, int[][] projection, List<ResolvedExpression> 
filters) {
-        return avro.createReaderFactory(type, projection, filters);
+        return format.createReaderFactory(type, projection, filters);
     }
 
     @Override
     public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
-        return fsDataOutputStream -> {
-            BulkWriter<RowData> wrapped = 
avro.createWriterFactory(type).create(fsDataOutputStream);
-            return new BulkWriter<RowData>() {
-                @Override
-                public void addElement(RowData rowData) throws IOException {
-                    wrapped.addElement(rowData);
-                    wrapped.flush();
-                }
-
-                @Override
-                public void flush() throws IOException {
-                    wrapped.flush();
-                }
+        return format.createWriterFactory(type);
+    }
 
-                @Override
-                public void finish() throws IOException {
-                    wrapped.finish();
-                }
-            };
-        };
+    @Override
+    public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
+        return Optional.of(new OrcFileStatsExtractor(type));
     }
 }
diff --git 
a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java
 
b/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java
new file mode 100644
index 0000000..c31dd10
--- /dev/null
+++ 
b/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.orc;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.format.FileFormatFactory;
+
+/** Factory to create {@link OrcFileFormat}. */
+public class OrcFileFormatFactory implements FileFormatFactory {
+
+    @Override
+    public String identifier() {
+        return "orc";
+    }
+
+    @Override
+    public FileFormat create(ClassLoader classLoader, ReadableConfig 
formatOptions) {
+        return new OrcFileFormat(classLoader, formatOptions);
+    }
+}
diff --git 
a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileStatsExtractor.java
 
b/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileStatsExtractor.java
new file mode 100644
index 0000000..8d321af
--- /dev/null
+++ 
b/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileStatsExtractor.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.orc;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** {@link FileStatsExtractor} for orc files. */
+public class OrcFileStatsExtractor implements FileStatsExtractor {
+
+    private final RowType rowType;
+
+    public OrcFileStatsExtractor(RowType rowType) {
+        this.rowType = rowType;
+    }
+
+    @Override
+    public FieldStats[] extract(Path path) throws IOException {
+        org.apache.hadoop.fs.Path hadoopPath = new 
org.apache.hadoop.fs.Path(path.toUri());
+        Reader reader =
+                OrcFile.createReader(hadoopPath, OrcFile.readerOptions(new 
Configuration()));
+
+        long rowCount = reader.getNumberOfRows();
+        ColumnStatistics[] columnStatistics = reader.getStatistics();
+        TypeDescription schema = reader.getSchema();
+        List<String> columnNames = schema.getFieldNames();
+        List<TypeDescription> columnTypes = schema.getChildren();
+
+        return IntStream.range(0, rowType.getFieldCount())
+                .mapToObj(
+                        i -> {
+                            RowType.RowField field = 
rowType.getFields().get(i);
+                            int fieldIdx = 
columnNames.indexOf(field.getName());
+                            int colId = columnTypes.get(fieldIdx).getId();
+                            return toFieldStats(field, 
columnStatistics[colId], rowCount);
+                        })
+                .toArray(FieldStats[]::new);
+    }
+
+    private FieldStats toFieldStats(RowType.RowField field, ColumnStatistics 
stats, long rowCount) {
+        long nullCount = rowCount - stats.getNumberOfValues();
+        if (nullCount == rowCount) {
+            // all nulls
+            return new FieldStats(null, null, nullCount);
+        }
+        Preconditions.checkState(
+                (nullCount > 0) == stats.hasNull(),
+                "Bug in OrcFileStatsExtractor: nullCount is "
+                        + nullCount
+                        + " while stats.hasNull() is "
+                        + stats.hasNull()
+                        + "!");
+
+        switch (field.getType().getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                assertStatsClass(field, stats, StringColumnStatistics.class);
+                StringColumnStatistics stringStats = (StringColumnStatistics) 
stats;
+                return new FieldStats(
+                        StringData.fromString(stringStats.getMinimum()),
+                        StringData.fromString(stringStats.getMaximum()),
+                        nullCount);
+            case BOOLEAN:
+                assertStatsClass(field, stats, BooleanColumnStatistics.class);
+                BooleanColumnStatistics boolStats = (BooleanColumnStatistics) 
stats;
+                return new FieldStats(
+                        boolStats.getFalseCount() == 0, 
boolStats.getTrueCount() != 0, nullCount);
+            case DECIMAL:
+                assertStatsClass(field, stats, DecimalColumnStatistics.class);
+                DecimalColumnStatistics decimalStats = 
(DecimalColumnStatistics) stats;
+                DecimalType decimalType = (DecimalType) (field.getType());
+                int precision = decimalType.getPrecision();
+                int scale = decimalType.getScale();
+                return new FieldStats(
+                        DecimalData.fromBigDecimal(
+                                decimalStats.getMinimum().bigDecimalValue(), 
precision, scale),
+                        DecimalData.fromBigDecimal(
+                                decimalStats.getMaximum().bigDecimalValue(), 
precision, scale),
+                        nullCount);
+            case TINYINT:
+                assertStatsClass(field, stats, IntegerColumnStatistics.class);
+                IntegerColumnStatistics byteStats = (IntegerColumnStatistics) 
stats;
+                return new FieldStats(
+                        (byte) byteStats.getMinimum(), (byte) 
byteStats.getMaximum(), nullCount);
+            case SMALLINT:
+                assertStatsClass(field, stats, IntegerColumnStatistics.class);
+                IntegerColumnStatistics shortStats = (IntegerColumnStatistics) 
stats;
+                return new FieldStats(
+                        (short) shortStats.getMinimum(),
+                        (short) shortStats.getMaximum(),
+                        nullCount);
+            case INTEGER:
+                assertStatsClass(field, stats, IntegerColumnStatistics.class);
+                IntegerColumnStatistics intStats = (IntegerColumnStatistics) 
stats;
+                return new FieldStats(
+                        Long.valueOf(intStats.getMinimum()).intValue(),
+                        Long.valueOf(intStats.getMaximum()).intValue(),
+                        nullCount);
+            case BIGINT:
+                assertStatsClass(field, stats, IntegerColumnStatistics.class);
+                IntegerColumnStatistics longStats = (IntegerColumnStatistics) 
stats;
+                return new FieldStats(longStats.getMinimum(), 
longStats.getMaximum(), nullCount);
+            case FLOAT:
+                assertStatsClass(field, stats, DoubleColumnStatistics.class);
+                DoubleColumnStatistics floatStats = (DoubleColumnStatistics) 
stats;
+                return new FieldStats(
+                        (float) floatStats.getMinimum(),
+                        (float) floatStats.getMaximum(),
+                        nullCount);
+            case DOUBLE:
+                assertStatsClass(field, stats, DoubleColumnStatistics.class);
+                DoubleColumnStatistics doubleStats = (DoubleColumnStatistics) 
stats;
+                return new FieldStats(
+                        doubleStats.getMinimum(), doubleStats.getMaximum(), 
nullCount);
+            case DATE:
+                assertStatsClass(field, stats, DateColumnStatistics.class);
+                DateColumnStatistics dateStats = (DateColumnStatistics) stats;
+                return new FieldStats(
+                        DateTimeUtils.toInternal(new 
Date(dateStats.getMinimum().getTime())),
+                        DateTimeUtils.toInternal(new 
Date(dateStats.getMaximum().getTime())),
+                        nullCount);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                assertStatsClass(field, stats, 
TimestampColumnStatistics.class);
+                TimestampColumnStatistics timestampStats = 
(TimestampColumnStatistics) stats;
+                return new FieldStats(
+                        
TimestampData.fromTimestamp(timestampStats.getMinimum()),
+                        
TimestampData.fromTimestamp(timestampStats.getMaximum()),
+                        nullCount);
+            default:
+                return new FieldStats(null, null, nullCount);
+        }
+    }
+
+    private void assertStatsClass(
+            RowType.RowField field,
+            ColumnStatistics stats,
+            Class<? extends ColumnStatistics> expectedClass) {
+        if (!expectedClass.isInstance(stats)) {
+            throw new IllegalArgumentException(
+                    "Expecting "
+                            + expectedClass.getName()
+                            + " for field "
+                            + field.asSummaryString()
+                            + " but found "
+                            + stats.getClass().getName());
+        }
+    }
+}
diff --git 
a/flink-table-store-orc/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
 
b/flink-table-store-orc/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
new file mode 100644
index 0000000..354bcc0
--- /dev/null
+++ 
b/flink-table-store-orc/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.orc.OrcFileFormatFactory
\ No newline at end of file
diff --git 
a/flink-table-store-orc/src/test/java/org/apache/flink/table/store/orc/OrcFileStatsExtractorTest.java
 
b/flink-table-store-orc/src/test/java/org/apache/flink/table/store/orc/OrcFileStatsExtractorTest.java
new file mode 100644
index 0000000..82d9271
--- /dev/null
+++ 
b/flink-table-store-orc/src/test/java/org/apache/flink/table/store/orc/OrcFileStatsExtractorTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.orc;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.stats.FileStatsExtractorTestBase;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/** Tests for {@link OrcFileStatsExtractor}. */
+public class OrcFileStatsExtractorTest extends FileStatsExtractorTestBase {
+
+    @Override
+    protected FileFormat createFormat() {
+        return FileFormat.fromIdentifier(
+                OrcFileStatsExtractorTest.class.getClassLoader(), "orc", new 
Configuration());
+    }
+
+    @Override
+    protected RowType rowType() {
+        return RowType.of(
+                new CharType(8),
+                new VarCharType(8),
+                new BooleanType(),
+                new TinyIntType(),
+                new SmallIntType(),
+                new IntType(),
+                new BigIntType(),
+                new FloatType(),
+                new DoubleType(),
+                new DecimalType(5, 2),
+                new DecimalType(38, 18),
+                new DateType(),
+                new TimestampType(3),
+                // orc reader & writer currently cannot preserve a high 
precision timestamp
+                // new TimestampType(9),
+                new ArrayType(new IntType()));
+    }
+}
diff --git a/pom.xml b/pom.xml
index 7d048b1..bb5620b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,10 +55,12 @@ under the License.
         <module>flink-table-store-core</module>
         <module>flink-table-store-connector</module>
         <module>flink-table-store-kafka</module>
+        <module>flink-table-store-orc</module>
     </modules>
 
     <properties>
         <flink.version>1.15-SNAPSHOT</flink.version>
+        <hadoop.version>2.8.5</hadoop.version>
         <scala.binary.version>2.12</scala.binary.version>
         <slf4j.version>1.7.15</slf4j.version>
         <log4j.version>2.17.1</log4j.version>

Reply via email to