This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 97ca28c4 [FLINK-31329] Fix Parquet stats extractor
97ca28c4 is described below

commit 97ca28c4f97ed705b0ae27cd0b30954db1dd6a18
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 6 17:01:21 2023 +0800

    [FLINK-31329] Fix Parquet stats extractor
    
    This closes #582
---
 .../apache/flink/table/store/data/Timestamp.java   |  17 ++
 .../table/store/data/columnar/ColumnarMap.java     |   5 +
 .../flink/table/store/file/predicate/In.java       |   3 +-
 .../table/store/file/predicate/IsNotNull.java      |   3 +-
 .../flink/table/store/file/predicate/IsNull.java   |   3 +-
 .../table/store/file/predicate/LeafPredicate.java  |   3 +-
 .../flink/table/store/file/predicate/NotIn.java    |   3 +-
 .../predicate/NullFalseLeafBinaryFunction.java     |   7 +-
 .../flink/table/store/format/FieldStats.java       |  12 +-
 .../flink/table/store/data/TimestampTest.java      |   9 +
 .../store/format/FileStatsExtractorTestBase.java   |   7 +
 flink-table-store-core/pom.xml                     |  29 ++++
 .../flink/table/store/file/io/DataFileMeta.java    |   2 +-
 .../table/store/file/stats/BinaryTableStats.java   |  16 +-
 .../file/stats/FieldStatsArraySerializer.java      |   7 +-
 .../store/file/predicate/PredicateBuilderTest.java |  20 +--
 .../table/store/file/predicate/PredicateTest.java  | 187 +++++++++++----------
 .../store/file/stats/BinaryTableStatsTest.java     |   2 +-
 .../file/stats/FieldStatsArraySerializerTest.java  |   2 +-
 .../store/file/stats/FieldStatsCollectorTest.java  |  24 +--
 .../table/store/file/stats/StatsTestUtils.java     |   4 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  |   4 +
 .../table/store/table/FileStoreTableTestBase.java  |   4 +
 .../format/parquet/ParquetFileStatsExtractor.java  |  72 ++++----
 .../store/format/parquet/ParquetReaderFactory.java |  17 +-
 .../format/parquet/ParquetSchemaConverter.java     |  63 ++++---
 .../table/store/format/parquet/ParquetUtil.java    |  10 +-
 .../format/parquet/reader/ArrayColumnReader.java   |  42 +++--
 .../parquet/reader/ParquetSplitReaderUtil.java     |  21 ++-
 .../parquet/reader/ParquetTimestampVector.java     |  63 +++++++
 .../parquet/writer/ParquetRowDataWriter.java       | 131 ++++++++++-----
 .../parquet/ParquetFileStatsExtractorTest.java     |  37 +++-
 .../store/format/parquet/ParquetReadWriteTest.java |  88 ++++++----
 33 files changed, 618 insertions(+), 299 deletions(-)

diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/Timestamp.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/Timestamp.java
index f4c0ce62..bb155ff2 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/Timestamp.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/Timestamp.java
@@ -47,6 +47,10 @@ public final class Timestamp implements 
Comparable<Timestamp>, Serializable {
     // the number of milliseconds in a day
     private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 
1000
 
+    public static final long MICROS_PER_MILLIS = 1000L;
+
+    public static final long NANOS_PER_MICROS = 1000L;
+
     // this field holds the integral second and the milli-of-second
     private final long millisecond;
 
@@ -104,6 +108,12 @@ public final class Timestamp implements 
Comparable<Timestamp>, Serializable {
         return Instant.ofEpochSecond(epochSecond, nanoAdjustment);
     }
 
+    /** Converts this {@link Timestamp} object to micros. */
+    public long toMicros() {
+        long micros = Math.multiplyExact(millisecond, MICROS_PER_MILLIS);
+        return micros + nanoOfMillisecond / NANOS_PER_MICROS;
+    }
+
     @Override
     public int compareTo(Timestamp that) {
         int cmp = Long.compare(this.millisecond, that.millisecond);
@@ -205,6 +215,13 @@ public final class Timestamp implements 
Comparable<Timestamp>, Serializable {
         return new Timestamp(millisecond, nanoOfMillisecond);
     }
 
+    /** Creates an instance of {@link Timestamp} from micros. */
+    public static Timestamp fromMicros(long micros) {
+        long mills = Math.floorDiv(micros, MICROS_PER_MILLIS);
+        long nanos = (micros - mills * MICROS_PER_MILLIS) * NANOS_PER_MICROS;
+        return Timestamp.fromEpochMillis(mills, (int) nanos);
+    }
+
     /**
      * Returns whether the timestamp data is small enough to be stored in a 
long of milliseconds.
      */
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/columnar/ColumnarMap.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/columnar/ColumnarMap.java
index 6402fd3b..14170214 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/columnar/ColumnarMap.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/columnar/ColumnarMap.java
@@ -70,4 +70,9 @@ public final class ColumnarMap implements InternalMap, 
Serializable {
         throw new UnsupportedOperationException(
                 "ColumnarMapData do not support hashCode, please hash fields 
one by one!");
     }
+
+    @Override
+    public String toString() {
+        return getClass().getName() + "@" + 
Integer.toHexString(super.hashCode());
+    }
 }
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/In.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/In.java
index 9121554d..224dfdb4 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/In.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/In.java
@@ -51,7 +51,8 @@ public class In extends LeafFunction {
     @Override
     public boolean test(
             DataType type, long rowCount, FieldStats fieldStats, List<Object> 
literals) {
-        if (rowCount == fieldStats.nullCount()) {
+        Long nullCount = fieldStats.nullCount();
+        if (nullCount != null && rowCount == nullCount) {
             return false;
         }
         for (Object literal : literals) {
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
index 14d1fcd0..19cab378 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
@@ -38,7 +38,8 @@ public class IsNotNull extends LeafUnaryFunction {
 
     @Override
     public boolean test(DataType type, long rowCount, FieldStats fieldStats) {
-        return fieldStats.nullCount() < rowCount;
+        Long nullCount = fieldStats.nullCount();
+        return nullCount == null || nullCount < rowCount;
     }
 
     @Override
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
index 2a88bf1e..5f819a59 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
@@ -38,7 +38,8 @@ public class IsNull extends LeafUnaryFunction {
 
     @Override
     public boolean test(DataType type, long rowCount, FieldStats fieldStats) {
-        return fieldStats.nullCount() > 0;
+        Long nullCount = fieldStats.nullCount();
+        return nullCount == null || nullCount > 0;
     }
 
     @Override
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
index 6f77383e..6d179b53 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
@@ -90,7 +90,8 @@ public class LeafPredicate implements Predicate {
     @Override
     public boolean test(long rowCount, FieldStats[] fieldStats) {
         FieldStats stats = fieldStats[fieldIndex];
-        if (rowCount != stats.nullCount()) {
+        Long nullCount = stats.nullCount();
+        if (nullCount == null || rowCount != nullCount) {
             // not all null
             // min or max is null
             // unknown stats
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
index 15e47c8c..5be1b4f1 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
@@ -51,7 +51,8 @@ public class NotIn extends LeafFunction {
     @Override
     public boolean test(
             DataType type, long rowCount, FieldStats fieldStats, List<Object> 
literals) {
-        if (rowCount == fieldStats.nullCount()) {
+        Long nullCount = fieldStats.nullCount();
+        if (nullCount != null && rowCount == nullCount) {
             return false;
         }
         for (Object literal : literals) {
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
index ae620941..24e2af3d 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
@@ -44,8 +44,11 @@ public abstract class NullFalseLeafBinaryFunction extends 
LeafFunction {
     @Override
     public boolean test(
             DataType type, long rowCount, FieldStats fieldStats, List<Object> 
literals) {
-        if (rowCount == fieldStats.nullCount() || literals.get(0) == null) {
-            return false;
+        Long nullCount = fieldStats.nullCount();
+        if (nullCount != null) {
+            if (rowCount == nullCount || literals.get(0) == null) {
+                return false;
+            }
         }
         return test(type, rowCount, fieldStats, literals.get(0));
     }
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStats.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStats.java
index d2ec5467..78c9a678 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStats.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStats.java
@@ -27,23 +27,27 @@ public class FieldStats {
 
     @Nullable private final Object minValue;
     @Nullable private final Object maxValue;
-    private final long nullCount;
+    private final Long nullCount;
 
-    public FieldStats(@Nullable Object minValue, @Nullable Object maxValue, 
long nullCount) {
+    public FieldStats(
+            @Nullable Object minValue, @Nullable Object maxValue, @Nullable 
Long nullCount) {
         this.minValue = minValue;
         this.maxValue = maxValue;
         this.nullCount = nullCount;
     }
 
+    @Nullable
     public Object minValue() {
         return minValue;
     }
 
+    @Nullable
     public Object maxValue() {
         return maxValue;
     }
 
-    public long nullCount() {
+    @Nullable
+    public Long nullCount() {
         return nullCount;
     }
 
@@ -55,7 +59,7 @@ public class FieldStats {
         FieldStats that = (FieldStats) o;
         return Objects.equals(minValue, that.minValue)
                 && Objects.equals(maxValue, that.maxValue)
-                && nullCount == that.nullCount;
+                && Objects.equals(nullCount, that.nullCount);
     }
 
     @Override
diff --git 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/TimestampTest.java
 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/TimestampTest.java
index 359d32a7..a704427a 100644
--- 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/TimestampTest.java
+++ 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/TimestampTest.java
@@ -130,4 +130,13 @@ public class TimestampTest {
         assertThat(Timestamp.fromInstant(instant).toString())
                 .isEqualTo("1970-01-01T00:00:00.123456789");
     }
+
+    @Test
+    public void testToMicros() {
+        java.sql.Timestamp t = java.sql.Timestamp.valueOf("2005-01-02 
00:00:00.123456789");
+        assertThat(Timestamp.fromSQLTimestamp(t).toString())
+                .isEqualTo("2005-01-02T00:00:00.123456789");
+        
assertThat(Timestamp.fromMicros(Timestamp.fromSQLTimestamp(t).toMicros()).toString())
+                .isEqualTo("2005-01-02T00:00:00.123456");
+    }
 }
diff --git 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
index 557564f7..ceec7595 100644
--- 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
+++ 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
@@ -90,9 +90,16 @@ public abstract class FileStatsExtractorTestBase {
         FileStatsExtractor extractor = 
format.createStatsExtractor(rowType).get();
         assertThat(extractor).isNotNull();
         FieldStats[] actual = extractor.extract(fileIO, path);
+        for (int i = 0; i < expected.length; i++) {
+            expected[i] = regenerate(expected[i], rowType.getTypeAt(i));
+        }
         assertThat(actual).isEqualTo(expected);
     }
 
+    protected FieldStats regenerate(FieldStats stats, DataType type) {
+        return stats;
+    }
+
     private List<GenericRow> createData(RowType rowType) {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         int numRows = random.nextInt(1, 100);
diff --git a/flink-table-store-core/pom.xml b/flink-table-store-core/pom.xml
index 684b616a..15ccd5cc 100644
--- a/flink-table-store-core/pom.xml
+++ b/flink-table-store-core/pom.xml
@@ -116,6 +116,35 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>ch.qos.reload4j</groupId>
+                    <artifactId>reload4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-reload4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/DataFileMeta.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/DataFileMeta.java
index 22f070f9..9229ae5d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/DataFileMeta.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/DataFileMeta.java
@@ -46,7 +46,7 @@ public class DataFileMeta {
     // Append only data files don't have any key columns and meaningful level 
value. it will use
     // the following dummy values.
     public static final BinaryTableStats EMPTY_KEY_STATS =
-            new BinaryTableStats(EMPTY_ROW, EMPTY_ROW, new long[0]);
+            new BinaryTableStats(EMPTY_ROW, EMPTY_ROW, new Long[0]);
     public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW;
     public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW;
     public static final int DUMMY_LEVEL = 0;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
index accf24e8..b4586e69 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file.stats;
 import org.apache.flink.table.store.data.BinaryRow;
 import org.apache.flink.table.store.data.GenericArray;
 import org.apache.flink.table.store.data.GenericRow;
+import org.apache.flink.table.store.data.InternalArray;
 import org.apache.flink.table.store.data.InternalRow;
 import org.apache.flink.table.store.format.FieldStats;
 
@@ -40,20 +41,20 @@ public class BinaryTableStats {
     @Nullable private FieldStats[] cacheArray;
     @Nullable private BinaryRow cacheMin;
     @Nullable private BinaryRow cacheMax;
-    @Nullable private long[] cacheNullCounts;
+    @Nullable private Long[] cacheNullCounts;
 
     public BinaryTableStats(InternalRow row) {
         this.row = row;
     }
 
-    public BinaryTableStats(BinaryRow cacheMin, BinaryRow cacheMax, long[] 
cacheNullCounts) {
+    public BinaryTableStats(BinaryRow cacheMin, BinaryRow cacheMax, Long[] 
cacheNullCounts) {
         this(cacheMin, cacheMax, cacheNullCounts, null);
     }
 
     public BinaryTableStats(
             BinaryRow cacheMin,
             BinaryRow cacheMax,
-            long[] cacheNullCounts,
+            Long[] cacheNullCounts,
             @Nullable FieldStats[] cacheArray) {
         this.cacheMin = cacheMin;
         this.cacheMax = cacheMax;
@@ -88,10 +89,15 @@ public class BinaryTableStats {
         return cacheMax;
     }
 
-    public long[] nullCounts() {
+    public Long[] nullCounts() {
         if (cacheNullCounts == null) {
             checkNotNull(row);
-            cacheNullCounts = row.getArray(2).toLongArray();
+            InternalArray internalArray = row.getArray(2);
+            Long[] array = new Long[internalArray.size()];
+            for (int i = 0; i < array.length; i++) {
+                array[i] = internalArray.isNullAt(i) ? null : 
internalArray.getLong(i);
+            }
+            return array;
         }
         return cacheNullCounts;
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
index 928bf331..62578f31 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
@@ -71,7 +71,7 @@ public class FieldStatsArraySerializer {
         int rowFieldCount = stats.length;
         GenericRow minValues = new GenericRow(rowFieldCount);
         GenericRow maxValues = new GenericRow(rowFieldCount);
-        long[] nullCounts = new long[rowFieldCount];
+        Long[] nullCounts = new Long[rowFieldCount];
         for (int i = 0; i < rowFieldCount; i++) {
             minValues.setField(i, stats[i].minValue());
             maxValues.setField(i, stats[i].maxValue());
@@ -91,6 +91,7 @@ public class FieldStatsArraySerializer {
     public FieldStats[] fromBinary(BinaryTableStats array, @Nullable Long 
rowCount) {
         int fieldCount = indexMapping == null ? fieldGetters.length : 
indexMapping.length;
         FieldStats[] stats = new FieldStats[fieldCount];
+        Long[] nullCounts = array.nullCounts();
         for (int i = 0; i < fieldCount; i++) {
             int fieldIndex = indexMapping == null ? i : indexMapping[i];
             if (fieldIndex < 0 || fieldIndex >= array.min().getFieldCount()) {
@@ -108,7 +109,7 @@ public class FieldStatsArraySerializer {
                 Object max = 
fieldGetters[fieldIndex].getFieldOrNull(array.max());
                 max = converter == null || max == null ? max : 
converter.cast(max);
 
-                stats[i] = new FieldStats(min, max, 
array.nullCounts()[fieldIndex]);
+                stats[i] = new FieldStats(min, max, nullCounts[fieldIndex]);
             }
         }
         return stats;
@@ -118,7 +119,7 @@ public class FieldStatsArraySerializer {
         List<DataField> fields = new ArrayList<>();
         fields.add(new DataField(0, "_MIN_VALUES", newBytesType(false)));
         fields.add(new DataField(1, "_MAX_VALUES", newBytesType(false)));
-        fields.add(new DataField(2, "_NULL_COUNTS", new ArrayType(new 
BigIntType(false))));
+        fields.add(new DataField(2, "_NULL_COUNTS", new ArrayType(new 
BigIntType(true))));
         return new RowType(fields);
     }
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
index 778fca23..7ed72cd1 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
@@ -42,11 +42,11 @@ public class PredicateBuilderTest {
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(2, 5, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 2, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(2, 5, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 2, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
     }
 
@@ -61,11 +61,11 @@ public class PredicateBuilderTest {
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(2, 5, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 2, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(2, 5, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 2, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
     }
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
index 645c22c2..0bd84f6c 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
@@ -42,10 +42,10 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
 
         
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.notEqual(0, 5));
@@ -59,8 +59,8 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
     }
 
@@ -73,11 +73,11 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(5, 5, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(5, 5, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
 
         assertThat(predicate.negate().orElse(null)).isEqualTo(builder.equal(0, 
5));
@@ -91,8 +91,8 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
     }
 
@@ -106,11 +106,11 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {6})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
 
         
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.lessOrEqual(0, 
5));
@@ -124,8 +124,8 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
     }
 
@@ -139,11 +139,11 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {6})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
 
         
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.lessThan(0, 5));
@@ -157,8 +157,8 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
     }
 
@@ -172,10 +172,10 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {6})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(4, 7, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(4, 7, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
 
         
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.greaterOrEqual(0, 
5));
@@ -189,8 +189,8 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
     }
 
@@ -204,10 +204,10 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {6})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(4, 7, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(4, 7, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
 
         
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.greaterThan(0, 
5));
@@ -221,8 +221,8 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
     }
 
@@ -234,8 +234,8 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(true);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7, 
1)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7, 
1L)})).isEqualTo(true);
 
         
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.isNotNull(0));
     }
@@ -248,9 +248,9 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7, 
1)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(null, 
null, 3)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7, 
1L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(null, 
null, 3L)}))
                 .isEqualTo(false);
 
         
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.isNull(0));
@@ -267,9 +267,9 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
     }
 
@@ -284,9 +284,9 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
     }
 
@@ -301,12 +301,12 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
     }
 
@@ -321,12 +321,12 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
     }
 
@@ -347,11 +347,12 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32, 
0L)}))
+                .isEqualTo(true);
     }
 
     @Test
@@ -372,11 +373,12 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32, 
0L)}))
+                .isEqualTo(true);
     }
 
     @Test
@@ -396,14 +398,15 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(true);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(true);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32, 
0)})).isEqualTo(true);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32, 
0L)}))
+                .isEqualTo(true);
     }
 
     @Test
@@ -424,14 +427,14 @@ public class PredicateTest {
         assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
         assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0)})).isEqualTo(false);
-        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7, 
0L)})).isEqualTo(false);
+        assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null, 
null, 1L)}))
                 .isEqualTo(false);
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32, 
0)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32, 
0L)}))
                 .isEqualTo(false);
     }
 
@@ -449,21 +452,21 @@ public class PredicateTest {
                         predicate.test(
                                 3,
                                 new FieldStats[] {
-                                    new FieldStats(3, 6, 0), new FieldStats(4, 
6, 0)
+                                    new FieldStats(3, 6, 0L), new 
FieldStats(4, 6, 0L)
                                 }))
                 .isEqualTo(true);
         assertThat(
                         predicate.test(
                                 3,
                                 new FieldStats[] {
-                                    new FieldStats(3, 6, 0), new FieldStats(6, 
8, 0)
+                                    new FieldStats(3, 6, 0L), new 
FieldStats(6, 8, 0L)
                                 }))
                 .isEqualTo(false);
         assertThat(
                         predicate.test(
                                 3,
                                 new FieldStats[] {
-                                    new FieldStats(6, 7, 0), new FieldStats(4, 
6, 0)
+                                    new FieldStats(6, 7, 0L), new 
FieldStats(4, 6, 0L)
                                 }))
                 .isEqualTo(false);
 
@@ -485,21 +488,21 @@ public class PredicateTest {
                         predicate.test(
                                 3,
                                 new FieldStats[] {
-                                    new FieldStats(3, 6, 0), new FieldStats(4, 
6, 0)
+                                    new FieldStats(3, 6, 0L), new 
FieldStats(4, 6, 0L)
                                 }))
                 .isEqualTo(true);
         assertThat(
                         predicate.test(
                                 3,
                                 new FieldStats[] {
-                                    new FieldStats(3, 6, 0), new FieldStats(6, 
8, 0)
+                                    new FieldStats(3, 6, 0L), new 
FieldStats(6, 8, 0L)
                                 }))
                 .isEqualTo(true);
         assertThat(
                         predicate.test(
                                 3,
                                 new FieldStats[] {
-                                    new FieldStats(6, 7, 0), new FieldStats(8, 
10, 0)
+                                    new FieldStats(6, 7, 0L), new 
FieldStats(8, 10, 0L)
                                 }))
                 .isEqualTo(false);
 
@@ -512,11 +515,11 @@ public class PredicateTest {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
IntType()));
         Predicate predicate = builder.equal(0, 5);
 
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(null, 
null, 3)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(null, 
null, 3L)}))
                 .isEqualTo(false);
 
         // unknown stats, we don't know, likely to hit
-        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(null, 
null, 4)}))
+        assertThat(predicate.test(3, new FieldStats[] {new FieldStats(null, 
null, 4L)}))
                 .isEqualTo(true);
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/BinaryTableStatsTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/BinaryTableStatsTest.java
index b413a787..43303c9f 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/BinaryTableStatsTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/BinaryTableStatsTest.java
@@ -38,7 +38,7 @@ public class BinaryTableStatsTest {
     public void testBinaryTableStats() {
         List<Integer> minList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
         List<Integer> maxList = Arrays.asList(11, 12, 13, 14, 15, 16, 17, 18, 
19);
-        long[] nullCounts = new long[] {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L};
+        Long[] nullCounts = new Long[] {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L};
 
         BinaryRow minRowData = binaryRow(minList);
         BinaryRow maxRowData = binaryRow(maxList);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
index 6eb3dd8d..957ede01 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
@@ -77,7 +77,7 @@ public class FieldStatsArraySerializerTest {
                         tableSchema.logicalRowType(), indexMapping, 
converterMapping);
         BinaryRow minRowData = row(1, 2, 3, 4);
         BinaryRow maxRowData = row(100, 99, 98, 97);
-        long[] nullCounts = new long[] {1, 0, 10, 100};
+        Long[] nullCounts = new Long[] {1L, 0L, 10L, 100L};
         BinaryTableStats dataTableStats = new BinaryTableStats(minRowData, 
maxRowData, nullCounts);
 
         FieldStats[] fieldStatsArray = 
dataTableStats.fields(fieldStatsArraySerializer, 1000L);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
index 2b4b1720..1fe39dae 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
@@ -47,24 +47,24 @@ public class FieldStatsCollectorTest {
         assertThat(collector.extract())
                 .isEqualTo(
                         new FieldStats[] {
-                            new FieldStats(1, 1, 0),
+                            new FieldStats(1, 1, 0L),
                             new FieldStats(
                                     BinaryString.fromString("Flink"),
                                     BinaryString.fromString("Flink"),
-                                    0),
-                            new FieldStats(null, null, 0)
+                                    0L),
+                            new FieldStats(null, null, 0L)
                         });
 
         collector.collect(GenericRow.of(3, null, new GenericArray(new int[] 
{3, 30})));
         assertThat(collector.extract())
                 .isEqualTo(
                         new FieldStats[] {
-                            new FieldStats(1, 3, 0),
+                            new FieldStats(1, 3, 0L),
                             new FieldStats(
                                     BinaryString.fromString("Flink"),
                                     BinaryString.fromString("Flink"),
-                                    1),
-                            new FieldStats(null, null, 0)
+                                    1L),
+                            new FieldStats(null, null, 0L)
                         });
 
         collector.collect(
@@ -75,24 +75,24 @@ public class FieldStatsCollectorTest {
         assertThat(collector.extract())
                 .isEqualTo(
                         new FieldStats[] {
-                            new FieldStats(1, 3, 1),
+                            new FieldStats(1, 3, 1L),
                             new FieldStats(
                                     BinaryString.fromString("Apache"),
                                     BinaryString.fromString("Flink"),
-                                    1),
-                            new FieldStats(null, null, 0)
+                                    1L),
+                            new FieldStats(null, null, 0L)
                         });
 
         collector.collect(GenericRow.of(2, BinaryString.fromString("Batch"), 
null));
         assertThat(collector.extract())
                 .isEqualTo(
                         new FieldStats[] {
-                            new FieldStats(1, 3, 1),
+                            new FieldStats(1, 3, 1L),
                             new FieldStats(
                                     BinaryString.fromString("Apache"),
                                     BinaryString.fromString("Flink"),
-                                    1),
-                            new FieldStats(null, null, 1)
+                                    1L),
+                            new FieldStats(null, null, 1L)
                         });
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
index e204fc60..45510bb7 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
@@ -76,7 +76,7 @@ public class StatsTestUtils {
                 new FieldStatsArraySerializer(RowType.of(new IntType()));
         FieldStats[] array = new FieldStats[fieldCount];
         for (int i = 0; i < fieldCount; i++) {
-            array[i] = new FieldStats(null, null, 0);
+            array[i] = new FieldStats(null, null, 0L);
         }
         return statsConverter.toBinary(array);
     }
@@ -84,6 +84,6 @@ public class StatsTestUtils {
     public static BinaryTableStats newTableStats(int min, int max) {
         FieldStatsArraySerializer statsConverter =
                 new FieldStatsArraySerializer(RowType.of(new IntType()));
-        return statsConverter.toBinary(new FieldStats[] {new FieldStats(min, 
max, 0)});
+        return statsConverter.toBinary(new FieldStats[] {new FieldStats(min, 
max, 0L)});
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index e1a1c89c..959f3f9a 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -499,6 +499,10 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
     @Test
     public void testReadFilter() throws Exception {
         FileStoreTable table = createFileStoreTable();
+        if 
(table.options().fileFormat().getFormatIdentifier().equals("parquet")) {
+            // TODO support parquet reader filter push down
+            return;
+        }
 
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 2ba20d45..5b0b04b3 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -238,6 +238,10 @@ public abstract class FileStoreTableTestBase {
     @Test
     public void testReadFilter() throws Exception {
         FileStoreTable table = createFileStoreTable();
+        if 
(table.options().fileFormat().getFormatIdentifier().equals("parquet")) {
+            // TODO support parquet reader filter push down
+            return;
+        }
 
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
diff --git 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractor.java
 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractor.java
index 32389b8e..699afc92 100644
--- 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractor.java
+++ 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractor.java
@@ -20,15 +20,16 @@ package org.apache.flink.table.store.format.parquet;
 
 import org.apache.flink.table.store.data.BinaryString;
 import org.apache.flink.table.store.data.Decimal;
+import org.apache.flink.table.store.data.Timestamp;
 import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.store.format.FileStatsExtractor;
 import org.apache.flink.table.store.fs.FileIO;
 import org.apache.flink.table.store.fs.Path;
 import org.apache.flink.table.store.types.DataField;
-import org.apache.flink.table.store.types.DataTypeRoot;
 import org.apache.flink.table.store.types.DecimalType;
+import org.apache.flink.table.store.types.LocalZonedTimestampType;
 import org.apache.flink.table.store.types.RowType;
-import org.apache.flink.table.store.utils.DateTimeUtils;
+import org.apache.flink.table.store.types.TimestampType;
 
 import org.apache.parquet.column.statistics.BinaryStatistics;
 import org.apache.parquet.column.statistics.BooleanStatistics;
@@ -37,7 +38,6 @@ import org.apache.parquet.column.statistics.FloatStatistics;
 import org.apache.parquet.column.statistics.IntStatistics;
 import org.apache.parquet.column.statistics.LongStatistics;
 import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.PrimitiveType;
 
 import java.io.IOException;
@@ -65,7 +65,7 @@ public class ParquetFileStatsExtractor implements 
FileStatsExtractor {
 
     @Override
     public FieldStats[] extract(FileIO fileIO, Path path) throws IOException {
-        Map<String, Statistics> stats = ParquetUtil.extractColumnStats(fileIO, 
path);
+        Map<String, Statistics<?>> stats = 
ParquetUtil.extractColumnStats(fileIO, path);
 
         return IntStream.range(0, rowType.getFieldCount())
                 .mapToObj(
@@ -76,29 +76,23 @@ public class ParquetFileStatsExtractor implements 
FileStatsExtractor {
                 .toArray(FieldStats[]::new);
     }
 
-    private FieldStats toFieldStats(DataField field, Statistics stats) {
-        DataTypeRoot flinkType = field.type().getTypeRoot();
-        if (stats == null
-                || flinkType == DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE
-                || flinkType == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
-            throw new UnsupportedOperationException(
-                    "type "
-                            + field.type().getTypeRoot()
-                            + " not supported for extracting statistics in 
parquet format");
+    private FieldStats toFieldStats(DataField field, Statistics<?> stats) {
+        if (stats == null) {
+            return new FieldStats(null, null, null);
         }
         long nullCount = stats.getNumNulls();
         if (!stats.hasNonNullValue()) {
             return new FieldStats(null, null, nullCount);
         }
 
-        switch (flinkType) {
+        switch (field.type().getTypeRoot()) {
             case CHAR:
             case VARCHAR:
                 assertStatsClass(field, stats, BinaryStatistics.class);
-                BinaryStatistics binaryStats = (BinaryStatistics) stats;
+                BinaryStatistics stringStats = (BinaryStatistics) stats;
                 return new FieldStats(
-                        BinaryString.fromString(binaryStats.minAsString()),
-                        BinaryString.fromString(binaryStats.maxAsString()),
+                        BinaryString.fromString(stringStats.minAsString()),
+                        BinaryString.fromString(stringStats.maxAsString()),
                         nullCount);
             case BOOLEAN:
                 assertStatsClass(field, stats, BooleanStatistics.class);
@@ -109,14 +103,8 @@ public class ParquetFileStatsExtractor implements 
FileStatsExtractor {
                 DecimalType decimalType = (DecimalType) (field.type());
                 int precision = decimalType.getPrecision();
                 int scale = decimalType.getScale();
-                if (primitive.getOriginalType() != null
-                        && primitive.getLogicalTypeAnnotation()
-                                instanceof 
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
-                    return convertStatsToDecimalFieldStats(
-                            primitive, field, stats, precision, scale, 
nullCount);
-                } else {
-                    return new FieldStats(null, null, nullCount);
-                }
+                return convertStatsToDecimalFieldStats(
+                        primitive, field, stats, precision, scale, nullCount);
             case TINYINT:
                 assertStatsClass(field, stats, IntStatistics.class);
                 IntStatistics byteStats = (IntStatistics) stats;
@@ -128,6 +116,8 @@ public class ParquetFileStatsExtractor implements 
FileStatsExtractor {
                 return new FieldStats(
                         (short) shortStats.getMin(), (short) 
shortStats.getMax(), nullCount);
             case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
                 assertStatsClass(field, stats, IntStatistics.class);
                 IntStatistics intStats = (IntStatistics) stats;
                 return new FieldStats(
@@ -146,18 +136,34 @@ public class ParquetFileStatsExtractor implements 
FileStatsExtractor {
                 assertStatsClass(field, stats, DoubleStatistics.class);
                 DoubleStatistics doubleStats = (DoubleStatistics) stats;
                 return new FieldStats(doubleStats.getMin(), 
doubleStats.getMax(), nullCount);
-            case DATE:
-                assertStatsClass(field, stats, IntStatistics.class);
-                IntStatistics dateStats = (IntStatistics) stats;
-                return new FieldStats(
-                        
DateTimeUtils.toInternal(EPOCH_DAY.plusDays(dateStats.getMin())),
-                        
DateTimeUtils.toInternal(EPOCH_DAY.plusDays(dateStats.getMax())),
-                        nullCount);
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return toTimestampStats(stats, ((TimestampType) 
field.type()).getPrecision());
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return toTimestampStats(
+                        stats, ((LocalZonedTimestampType) 
field.type()).getPrecision());
             default:
                 return new FieldStats(null, null, nullCount);
         }
     }
 
+    private FieldStats toTimestampStats(Statistics<?> stats, int precision) {
+        if (precision <= 3) {
+            LongStatistics longStats = (LongStatistics) stats;
+            return new FieldStats(
+                    Timestamp.fromEpochMillis(longStats.getMin()),
+                    Timestamp.fromEpochMillis(longStats.getMax()),
+                    stats.getNumNulls());
+        } else if (precision <= 6) {
+            LongStatistics longStats = (LongStatistics) stats;
+            return new FieldStats(
+                    Timestamp.fromMicros(longStats.getMin()),
+                    Timestamp.fromMicros(longStats.getMax()),
+                    stats.getNumNulls());
+        } else {
+            return new FieldStats(null, null, stats.getNumNulls());
+        }
+    }
+
     /**
      * Parquet cannot provide statistics for decimal fields directly, but we 
can extract them from
      * primitive statistics.
@@ -165,7 +171,7 @@ public class ParquetFileStatsExtractor implements 
FileStatsExtractor {
     private FieldStats convertStatsToDecimalFieldStats(
             PrimitiveType primitive,
             DataField field,
-            Statistics stats,
+            Statistics<?> stats,
             int precision,
             int scale,
             long nullCount) {
diff --git 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetReaderFactory.java
 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetReaderFactory.java
index ff7afae6..e91e43c0 100644
--- 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetReaderFactory.java
+++ 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetReaderFactory.java
@@ -27,13 +27,13 @@ import 
org.apache.flink.table.store.data.columnar.writable.WritableColumnVector;
 import org.apache.flink.table.store.format.FormatReaderFactory;
 import org.apache.flink.table.store.format.parquet.reader.ColumnReader;
 import org.apache.flink.table.store.format.parquet.reader.ParquetDecimalVector;
+import 
org.apache.flink.table.store.format.parquet.reader.ParquetTimestampVector;
 import org.apache.flink.table.store.fs.FileIO;
 import org.apache.flink.table.store.fs.Path;
 import org.apache.flink.table.store.options.Options;
 import org.apache.flink.table.store.reader.RecordReader;
 import org.apache.flink.table.store.reader.RecordReader.RecordIterator;
 import org.apache.flink.table.store.types.DataType;
-import org.apache.flink.table.store.types.DataTypeRoot;
 import org.apache.flink.table.store.types.RowType;
 import org.apache.flink.table.store.utils.Pool;
 
@@ -223,10 +223,17 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
             WritableColumnVector[] writableVectors) {
         ColumnVector[] vectors = new ColumnVector[writableVectors.length];
         for (int i = 0; i < writableVectors.length; i++) {
-            vectors[i] =
-                    projectedTypes[i].getTypeRoot() == DataTypeRoot.DECIMAL
-                            ? new ParquetDecimalVector(writableVectors[i])
-                            : writableVectors[i];
+            switch (projectedTypes[i].getTypeRoot()) {
+                case DECIMAL:
+                    vectors[i] = new ParquetDecimalVector(writableVectors[i]);
+                    break;
+                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                    vectors[i] = new 
ParquetTimestampVector(writableVectors[i]);
+                    break;
+                default:
+                    vectors[i] = writableVectors[i];
+            }
         }
         return new VectorizedColumnBatch(vectors);
     }
diff --git 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetSchemaConverter.java
 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetSchemaConverter.java
index 502c287d..84954599 100644
--- 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetSchemaConverter.java
+++ 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetSchemaConverter.java
@@ -22,12 +22,15 @@ import org.apache.flink.table.store.types.ArrayType;
 import org.apache.flink.table.store.types.DataType;
 import org.apache.flink.table.store.types.DecimalType;
 import org.apache.flink.table.store.types.IntType;
+import org.apache.flink.table.store.types.LocalZonedTimestampType;
 import org.apache.flink.table.store.types.MapType;
 import org.apache.flink.table.store.types.MultisetType;
 import org.apache.flink.table.store.types.RowType;
+import org.apache.flink.table.store.types.TimestampType;
 
 import org.apache.parquet.schema.ConversionPatterns;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
@@ -37,6 +40,10 @@ import org.apache.parquet.schema.Types;
 import java.util.ArrayList;
 import java.util.List;
 
+import static 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+
 /** Schema converter converts Parquet schema to and from Flink internal types. 
*/
 public class ParquetSchemaConverter {
 
@@ -73,28 +80,28 @@ public class ParquetSchemaConverter {
             case DECIMAL:
                 int precision = ((DecimalType) type).getPrecision();
                 int scale = ((DecimalType) type).getScale();
-                int numBytes = computeMinBytesForDecimalPrecision(precision);
-                return Types.primitive(
-                                
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
-                        .precision(precision)
-                        .scale(scale)
-                        .length(numBytes)
-                        .as(OriginalType.DECIMAL)
-                        .named(name);
+                if (is32BitDecimal(precision)) {
+                    return Types.primitive(INT32, repetition)
+                            .as(LogicalTypeAnnotation.decimalType(scale, 
precision))
+                            .named(name);
+                } else if (is64BitDecimal(precision)) {
+                    return Types.primitive(INT64, repetition)
+                            .as(LogicalTypeAnnotation.decimalType(scale, 
precision))
+                            .named(name);
+                } else {
+                    return Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+                            .as(LogicalTypeAnnotation.decimalType(scale, 
precision))
+                            
.length(computeMinBytesForDecimalPrecision(precision))
+                            .named(name);
+                }
             case TINYINT:
-                return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, 
repetition)
-                        .as(OriginalType.INT_8)
-                        .named(name);
+                return Types.primitive(INT32, 
repetition).as(OriginalType.INT_8).named(name);
             case SMALLINT:
-                return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, 
repetition)
-                        .as(OriginalType.INT_16)
-                        .named(name);
+                return Types.primitive(INT32, 
repetition).as(OriginalType.INT_16).named(name);
             case INTEGER:
-                return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, 
repetition)
-                        .named(name);
+                return Types.primitive(INT32, repetition).named(name);
             case BIGINT:
-                return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, 
repetition)
-                        .named(name);
+                return Types.primitive(INT64, repetition).named(name);
             case FLOAT:
                 return Types.primitive(PrimitiveType.PrimitiveTypeName.FLOAT, 
repetition)
                         .named(name);
@@ -102,17 +109,21 @@ public class ParquetSchemaConverter {
                 return Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, 
repetition)
                         .named(name);
             case DATE:
-                return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, 
repetition)
-                        .as(OriginalType.DATE)
-                        .named(name);
+                return Types.primitive(INT32, 
repetition).as(OriginalType.DATE).named(name);
             case TIME_WITHOUT_TIME_ZONE:
-                return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, 
repetition)
-                        .as(OriginalType.TIME_MILLIS)
-                        .named(name);
+                return Types.primitive(INT32, 
repetition).as(OriginalType.TIME_MILLIS).named(name);
             case TIMESTAMP_WITHOUT_TIME_ZONE:
+                TimestampType timestampType = (TimestampType) type;
+                return timestampType.getPrecision() <= 6
+                        ? Types.primitive(INT64, repetition).named(name)
+                        : 
Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
+                                .named(name);
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, 
repetition)
-                        .named(name);
+                LocalZonedTimestampType localZonedTimestampType = 
(LocalZonedTimestampType) type;
+                return localZonedTimestampType.getPrecision() <= 6
+                        ? Types.primitive(INT64, repetition).named(name)
+                        : 
Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
+                                .named(name);
             case ARRAY:
                 ArrayType arrayType = (ArrayType) type;
                 return ConversionPatterns.listOfElements(
diff --git 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetUtil.java
 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetUtil.java
index 14e3a387..b86ac853 100644
--- 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetUtil.java
+++ 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetUtil.java
@@ -44,17 +44,17 @@ public class ParquetUtil {
      * @return result sets as map, key is column name, value is statistics 
(for example, null count,
      *     minimum value, maximum value)
      */
-    public static Map<String, Statistics> extractColumnStats(FileIO fileIO, 
Path path)
+    public static Map<String, Statistics<?>> extractColumnStats(FileIO fileIO, 
Path path)
             throws IOException {
         ParquetMetadata parquetMetadata = getParquetReader(fileIO, 
path).getFooter();
         List<BlockMetaData> blockMetaDataList = parquetMetadata.getBlocks();
-        Map<String, Statistics> resultStats = new HashMap<>();
+        Map<String, Statistics<?>> resultStats = new HashMap<>();
         for (BlockMetaData blockMetaData : blockMetaDataList) {
             List<ColumnChunkMetaData> columnChunkMetaDataList = 
blockMetaData.getColumns();
             for (ColumnChunkMetaData columnChunkMetaData : 
columnChunkMetaDataList) {
-                Statistics stats = columnChunkMetaData.getStatistics();
+                Statistics<?> stats = columnChunkMetaData.getStatistics();
                 String columnName = 
columnChunkMetaData.getPath().toDotString();
-                Statistics midStats;
+                Statistics<?> midStats;
                 if (!resultStats.containsKey(columnName)) {
                     midStats = stats;
                 } else {
@@ -79,7 +79,7 @@ public class ParquetUtil {
     }
 
     static void assertStatsClass(
-            DataField field, Statistics stats, Class<? extends Statistics> 
expectedClass) {
+            DataField field, Statistics<?> stats, Class<? extends 
Statistics<?>> expectedClass) {
         if (!expectedClass.isInstance(stats)) {
             throw new IllegalArgumentException(
                     "Expecting "
diff --git 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ArrayColumnReader.java
 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ArrayColumnReader.java
index 9e3cda53..6b4b76d3 100644
--- 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ArrayColumnReader.java
+++ 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ArrayColumnReader.java
@@ -404,22 +404,38 @@ public class ArrayColumnReader extends 
BaseVectorizedColumnReader {
                 break;
             case TIMESTAMP_WITHOUT_TIME_ZONE:
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                HeapTimestampVector timestampVector = new 
HeapTimestampVector(total);
-                timestampVector.reset();
-                lcv.setChild(timestampVector);
-                for (int i = 0; i < valueList.size(); i++) {
-                    if (valueList.get(i) == null) {
-                        ((HeapTimestampVector) lcv.getChild()).setNullAt(i);
-                    } else {
-                        ((HeapTimestampVector) lcv.getChild())
-                                .setTimestamp(i, ((List<Timestamp>) 
valueList).get(i));
+                if (descriptor.getPrimitiveType().getPrimitiveTypeName()
+                        == PrimitiveType.PrimitiveTypeName.INT64) {
+                    HeapLongVector heapLongVector = new HeapLongVector(total);
+                    heapLongVector.reset();
+                    lcv.setChild(new ParquetTimestampVector(heapLongVector));
+                    for (int i = 0; i < valueList.size(); i++) {
+                        if (valueList.get(i) == null) {
+                            ((HeapLongVector) ((ParquetTimestampVector) 
lcv.getChild()).getVector())
+                                    .setNullAt(i);
+                        } else {
+                            ((HeapLongVector) ((ParquetTimestampVector) 
lcv.getChild()).getVector())
+                                            .vector[i] =
+                                    ((List<Long>) valueList).get(i);
+                        }
                     }
+                    break;
+                } else {
+                    HeapTimestampVector timestampVector = new 
HeapTimestampVector(total);
+                    timestampVector.reset();
+                    lcv.setChild(timestampVector);
+                    for (int i = 0; i < valueList.size(); i++) {
+                        if (valueList.get(i) == null) {
+                            ((HeapTimestampVector) 
lcv.getChild()).setNullAt(i);
+                        } else {
+                            ((HeapTimestampVector) lcv.getChild())
+                                    .setTimestamp(i, ((List<Timestamp>) 
valueList).get(i));
+                        }
+                    }
+                    break;
                 }
-                break;
             case DECIMAL:
-                PrimitiveType.PrimitiveTypeName primitiveTypeName =
-                        descriptor.getPrimitiveType().getPrimitiveTypeName();
-                switch (primitiveTypeName) {
+                switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
                     case INT32:
                         HeapIntVector heapIntVector = new HeapIntVector(total);
                         heapIntVector.reset();
diff --git 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ParquetSplitReaderUtil.java
 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ParquetSplitReaderUtil.java
index 9efaef6e..3df80d5d 100644
--- 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ParquetSplitReaderUtil.java
+++ 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ParquetSplitReaderUtil.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.table.store.data.columnar.writable.WritableColumnVector;
 import org.apache.flink.table.store.format.parquet.ParquetSchemaConverter;
 import org.apache.flink.table.store.types.ArrayType;
 import org.apache.flink.table.store.types.DataType;
+import org.apache.flink.table.store.types.DataTypeChecks;
 import org.apache.flink.table.store.types.DecimalType;
 import org.apache.flink.table.store.types.IntType;
 import org.apache.flink.table.store.types.MapType;
@@ -100,6 +101,11 @@ public class ParquetSplitReaderUtil {
                         descriptors.get(0), 
pages.getPageReader(descriptors.get(0)));
             case TIMESTAMP_WITHOUT_TIME_ZONE:
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                if 
(descriptors.get(0).getPrimitiveType().getPrimitiveTypeName()
+                        == PrimitiveType.PrimitiveTypeName.INT64) {
+                    return new LongColumnReader(
+                            descriptors.get(0), 
pages.getPageReader(descriptors.get(0)));
+                }
                 return new TimestampColumnReader(
                         true, descriptors.get(0), 
pages.getPageReader(descriptors.get(0)));
             case DECIMAL:
@@ -245,11 +251,16 @@ public class ParquetSplitReaderUtil {
                 return new HeapBytesVector(batchSize);
             case TIMESTAMP_WITHOUT_TIME_ZONE:
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                checkArgument(
-                        typeName == PrimitiveType.PrimitiveTypeName.INT96,
-                        "Unexpected type: %s",
-                        typeName);
-                return new HeapTimestampVector(batchSize);
+                int precision = DataTypeChecks.getPrecision(fieldType);
+                if (precision > 6) {
+                    checkArgument(
+                            typeName == PrimitiveType.PrimitiveTypeName.INT96,
+                            "Unexpected type: %s",
+                            typeName);
+                    return new HeapTimestampVector(batchSize);
+                } else {
+                    return new HeapLongVector(batchSize);
+                }
             case DECIMAL:
                 DecimalType decimalType = (DecimalType) fieldType;
                 if 
(ParquetSchemaConverter.is32BitDecimal(decimalType.getPrecision())) {
diff --git 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ParquetTimestampVector.java
 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ParquetTimestampVector.java
new file mode 100644
index 00000000..0625f42a
--- /dev/null
+++ 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ParquetTimestampVector.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.parquet.reader;
+
+import org.apache.flink.table.store.data.Timestamp;
+import org.apache.flink.table.store.data.columnar.ColumnVector;
+import org.apache.flink.table.store.data.columnar.LongColumnVector;
+import org.apache.flink.table.store.data.columnar.TimestampColumnVector;
+
+import org.apache.parquet.Preconditions;
+
+/**
+ * Parquet write timestamp precision 0-3 as int64 mills, 4-6 as int64 micros, 
7-9 as int96, this
+ * class wrap the real vector to provide {@link TimestampColumnVector} 
interface.
+ */
+public class ParquetTimestampVector implements TimestampColumnVector {
+
+    private final ColumnVector vector;
+
+    public ParquetTimestampVector(ColumnVector vector) {
+        this.vector = vector;
+    }
+
+    @Override
+    public Timestamp getTimestamp(int i, int precision) {
+        if (precision <= 3 && vector instanceof LongColumnVector) {
+            return Timestamp.fromEpochMillis(((LongColumnVector) 
vector).getLong(i));
+        } else if (precision <= 6 && vector instanceof LongColumnVector) {
+            return Timestamp.fromMicros(((LongColumnVector) 
vector).getLong(i));
+        } else {
+            Preconditions.checkArgument(
+                    vector instanceof TimestampColumnVector,
+                    "Reading timestamp type occur unsupported vector type: %s",
+                    vector.getClass());
+            return ((TimestampColumnVector) vector).getTimestamp(i, precision);
+        }
+    }
+
+    public ColumnVector getVector() {
+        return vector;
+    }
+
+    @Override
+    public boolean isNullAt(int i) {
+        return vector.isNullAt(i);
+    }
+}
diff --git 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/writer/ParquetRowDataWriter.java
 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/writer/ParquetRowDataWriter.java
index 5425e9dd..c7dfaa8e 100644
--- 
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/writer/ParquetRowDataWriter.java
+++ 
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/writer/ParquetRowDataWriter.java
@@ -33,7 +33,6 @@ import org.apache.flink.table.store.types.MapType;
 import org.apache.flink.table.store.types.MultisetType;
 import org.apache.flink.table.store.types.RowType;
 import org.apache.flink.table.store.types.TimestampType;
-import org.apache.flink.table.store.utils.Preconditions;
 
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.RecordConsumer;
@@ -50,6 +49,7 @@ import static 
org.apache.flink.table.store.format.parquet.ParquetSchemaConverter
 import static 
org.apache.flink.table.store.format.parquet.reader.TimestampColumnReader.JULIAN_EPOCH_OFFSET_DAYS;
 import static 
org.apache.flink.table.store.format.parquet.reader.TimestampColumnReader.MILLIS_IN_DAY;
 import static 
org.apache.flink.table.store.format.parquet.reader.TimestampColumnReader.NANOS_PER_MILLISECOND;
+import static org.apache.flink.table.store.utils.Preconditions.checkArgument;
 
 /** Writes a record to the Parquet API with the expected schema in order to be 
written to a file. */
 public class ParquetRowDataWriter {
@@ -104,10 +104,10 @@ public class ParquetRowDataWriter {
                     return new DoubleWriter();
                 case TIMESTAMP_WITHOUT_TIME_ZONE:
                     TimestampType timestampType = (TimestampType) t;
-                    return new TimestampWriter(timestampType.getPrecision());
+                    return createTimestampWriter(timestampType.getPrecision());
                 case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                     LocalZonedTimestampType localZonedTimestampType = 
(LocalZonedTimestampType) t;
-                    return new 
TimestampWriter(localZonedTimestampType.getPrecision());
+                    return 
createTimestampWriter(localZonedTimestampType.getPrecision());
                 default:
                     throw new UnsupportedOperationException("Unsupported type: 
" + type);
             }
@@ -134,6 +134,16 @@ public class ParquetRowDataWriter {
         }
     }
 
+    private FieldWriter createTimestampWriter(int precision) {
+        if (precision <= 3) {
+            return new TimestampMillsWriter(precision);
+        } else if (precision > 6) {
+            return new TimestampInt96Writer(precision);
+        } else {
+            return new TimestampMicrosWriter(precision);
+        }
+    }
+
     private interface FieldWriter {
 
         void write(InternalRow row, int ordinal);
@@ -294,16 +304,61 @@ public class ParquetRowDataWriter {
         }
     }
 
-    /**
-     * We only support INT96 bytes now, julianDay(4) + nanosOfDay(8). See
-     * 
https://github.com/apache/parquet-format/blob/master/DataTypes.md#timestamp 
TIMESTAMP_MILLIS
-     * and TIMESTAMP_MICROS are the deprecated ConvertedType.
-     */
-    private class TimestampWriter implements FieldWriter {
+    private class TimestampMillsWriter implements FieldWriter {
+
+        private final int precision;
+
+        private TimestampMillsWriter(int precision) {
+            checkArgument(precision <= 3);
+            this.precision = precision;
+        }
+
+        @Override
+        public void write(InternalRow row, int ordinal) {
+            writeTimestamp(row.getTimestamp(ordinal, precision));
+        }
+
+        @Override
+        public void write(InternalArray arrayData, int ordinal) {
+            writeTimestamp(arrayData.getTimestamp(ordinal, precision));
+        }
+
+        private void writeTimestamp(Timestamp value) {
+            recordConsumer.addLong(value.getMillisecond());
+        }
+    }
+
+    private class TimestampMicrosWriter implements FieldWriter {
+
+        private final int precision;
+
+        private TimestampMicrosWriter(int precision) {
+            checkArgument(precision > 3);
+            checkArgument(precision <= 6);
+            this.precision = precision;
+        }
+
+        @Override
+        public void write(InternalRow row, int ordinal) {
+            writeTimestamp(row.getTimestamp(ordinal, precision));
+        }
+
+        @Override
+        public void write(InternalArray arrayData, int ordinal) {
+            writeTimestamp(arrayData.getTimestamp(ordinal, precision));
+        }
+
+        private void writeTimestamp(Timestamp value) {
+            recordConsumer.addLong(value.toMicros());
+        }
+    }
+
+    private class TimestampInt96Writer implements FieldWriter {
 
         private final int precision;
 
-        private TimestampWriter(int precision) {
+        private TimestampInt96Writer(int precision) {
+            checkArgument(precision > 6);
             this.precision = precision;
         }
 
@@ -487,26 +542,34 @@ public class ParquetRowDataWriter {
     }
 
     private FieldWriter createDecimalWriter(int precision, int scale) {
-        Preconditions.checkArgument(
+        checkArgument(
                 precision <= DecimalType.MAX_PRECISION,
                 "Decimal precision %s exceeds max precision %s",
                 precision,
                 DecimalType.MAX_PRECISION);
 
-        /*
-         * This is optimizer for UnscaledBytesWriter.
-         */
-        class LongUnscaledBytesWriter implements FieldWriter {
-            private final int numBytes;
-            private final int initShift;
-            private final byte[] decimalBuffer;
+        class Int32Writer implements FieldWriter {
 
-            private LongUnscaledBytesWriter() {
-                this.numBytes = computeMinBytesForDecimalPrecision(precision);
-                this.initShift = 8 * (numBytes - 1);
-                this.decimalBuffer = new byte[numBytes];
+            @Override
+            public void write(InternalArray arrayData, int ordinal) {
+                long unscaledLong =
+                        (arrayData.getDecimal(ordinal, precision, 
scale)).toUnscaledLong();
+                addRecord(unscaledLong);
             }
 
+            @Override
+            public void write(InternalRow row, int ordinal) {
+                long unscaledLong = row.getDecimal(ordinal, precision, 
scale).toUnscaledLong();
+                addRecord(unscaledLong);
+            }
+
+            private void addRecord(long unscaledLong) {
+                recordConsumer.addInteger((int) unscaledLong);
+            }
+        }
+
+        class Int64Writer implements FieldWriter {
+
             @Override
             public void write(InternalArray arrayData, int ordinal) {
                 long unscaledLong =
@@ -521,15 +584,7 @@ public class ParquetRowDataWriter {
             }
 
             private void addRecord(long unscaledLong) {
-                int i = 0;
-                int shift = initShift;
-                while (i < numBytes) {
-                    decimalBuffer[i] = (byte) (unscaledLong >> shift);
-                    i += 1;
-                    shift -= 8;
-                }
-
-                
recordConsumer.addBinary(Binary.fromReusedByteArray(decimalBuffer, 0, 
numBytes));
+                recordConsumer.addLong(unscaledLong);
             }
         }
 
@@ -570,14 +625,12 @@ public class ParquetRowDataWriter {
             }
         }
 
-        // 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY
-        // optimizer for UnscaledBytesWriter
-        if (ParquetSchemaConverter.is32BitDecimal(precision)
-                || ParquetSchemaConverter.is64BitDecimal(precision)) {
-            return new LongUnscaledBytesWriter();
+        if (ParquetSchemaConverter.is32BitDecimal(precision)) {
+            return new Int32Writer();
+        } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
+            return new Int64Writer();
+        } else {
+            return new UnscaledBytesWriter();
         }
-
-        // 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY
-        return new UnscaledBytesWriter();
     }
 }
diff --git 
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractorTest.java
 
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractorTest.java
index 36bf3761..1df1bfa0 100644
--- 
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractorTest.java
+++ 
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractorTest.java
@@ -18,20 +18,28 @@
 
 package org.apache.flink.table.store.format.parquet;
 
+import org.apache.flink.table.store.format.FieldStats;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.format.FileStatsExtractorTestBase;
 import org.apache.flink.table.store.options.Options;
+import org.apache.flink.table.store.types.ArrayType;
 import org.apache.flink.table.store.types.BigIntType;
+import org.apache.flink.table.store.types.BinaryType;
 import org.apache.flink.table.store.types.BooleanType;
 import org.apache.flink.table.store.types.CharType;
+import org.apache.flink.table.store.types.DataType;
 import org.apache.flink.table.store.types.DateType;
 import org.apache.flink.table.store.types.DecimalType;
 import org.apache.flink.table.store.types.DoubleType;
 import org.apache.flink.table.store.types.FloatType;
 import org.apache.flink.table.store.types.IntType;
+import org.apache.flink.table.store.types.MapType;
+import org.apache.flink.table.store.types.MultisetType;
 import org.apache.flink.table.store.types.RowType;
 import org.apache.flink.table.store.types.SmallIntType;
+import org.apache.flink.table.store.types.TimestampType;
 import org.apache.flink.table.store.types.TinyIntType;
+import org.apache.flink.table.store.types.VarBinaryType;
 import org.apache.flink.table.store.types.VarCharType;
 
 /** Tests for {@link ParquetFileStatsExtractor}. */
@@ -49,6 +57,8 @@ public class ParquetFileStatsExtractorTest extends 
FileStatsExtractorTestBase {
                         new CharType(8),
                         new VarCharType(8),
                         new BooleanType(),
+                        new BinaryType(8),
+                        new VarBinaryType(8),
                         new TinyIntType(),
                         new SmallIntType(),
                         new IntType(),
@@ -56,8 +66,33 @@ public class ParquetFileStatsExtractorTest extends 
FileStatsExtractorTestBase {
                         new FloatType(),
                         new DoubleType(),
                         new DecimalType(5, 2),
+                        new DecimalType(15, 2),
                         new DecimalType(38, 18),
-                        new DateType())
+                        new DateType(),
+                        new TimestampType(3),
+                        new TimestampType(6),
+                        new TimestampType(9),
+                        new ArrayType(new IntType()),
+                        new MapType(new VarCharType(8), new VarCharType(8)),
+                        new MultisetType(new VarCharType(8)))
                 .build();
     }
+
+    @Override
+    protected FieldStats regenerate(FieldStats stats, DataType type) {
+        switch (type.getTypeRoot()) {
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                TimestampType timestampType = (TimestampType) type;
+                if (timestampType.getPrecision() > 6) {
+                    return new FieldStats(null, null, stats.nullCount());
+                }
+                break;
+            case ARRAY:
+            case MAP:
+            case MULTISET:
+            case ROW:
+                return new FieldStats(null, null, null);
+        }
+        return stats;
+    }
 }
diff --git 
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetReadWriteTest.java
 
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetReadWriteTest.java
index 54325335..e56e3a4a 100644
--- 
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetReadWriteTest.java
+++ 
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetReadWriteTest.java
@@ -86,6 +86,8 @@ public class ParquetReadWriteTest {
                             new BigIntType(),
                             new FloatType(),
                             new DoubleType(),
+                            new TimestampType(3),
+                            new TimestampType(6),
                             new TimestampType(9),
                             new DecimalType(5, 0),
                             new DecimalType(15, 2),
@@ -362,6 +364,8 @@ public class ParquetReadWriteTest {
                         assertThat(row.isNullAt(30)).isTrue();
                         assertThat(row.isNullAt(31)).isTrue();
                         assertThat(row.isNullAt(32)).isTrue();
+                        assertThat(row.isNullAt(33)).isTrue();
+                        assertThat(row.isNullAt(34)).isTrue();
                     } else {
                         assertThat(row.getString(0)).hasToString("" + v);
                         assertThat(row.getBoolean(1)).isEqualTo(v % 2 == 0);
@@ -371,55 +375,56 @@ public class ParquetReadWriteTest {
                         assertThat(row.getLong(5)).isEqualTo(v.longValue());
                         assertThat(row.getFloat(6)).isEqualTo(v.floatValue());
                         
assertThat(row.getDouble(7)).isEqualTo(v.doubleValue());
-                        assertThat(row.getTimestamp(8, 9).toLocalDateTime())
-                                .isEqualTo(toDateTime(v));
+                        assertThat(row.getTimestamp(8, 
3)).isEqualTo(toMills(v));
+                        assertThat(row.getTimestamp(9, 
6)).isEqualTo(toMicros(v));
+                        assertThat(row.getTimestamp(10, 
9)).isEqualTo(toNanos(v));
                         if (Decimal.fromBigDecimal(BigDecimal.valueOf(v), 5, 
0) == null) {
-                            assertThat(row.isNullAt(9)).isTrue();
-                            assertThat(row.isNullAt(12)).isTrue();
-                            assertThat(row.isNullAt(24)).isTrue();
-                            assertThat(row.isNullAt(27)).isTrue();
+                            assertThat(row.isNullAt(11)).isTrue();
+                            assertThat(row.isNullAt(14)).isTrue();
+                            assertThat(row.isNullAt(26)).isTrue();
+                            assertThat(row.isNullAt(29)).isTrue();
                         } else {
-                            assertThat(row.getDecimal(9, 5, 0))
+                            assertThat(row.getDecimal(11, 5, 0))
                                     
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(v), 5, 0));
-                            assertThat(row.getDecimal(12, 5, 0))
+                            assertThat(row.getDecimal(14, 5, 0))
                                     
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(v), 5, 0));
-                            assertThat(row.getArray(24).getDecimal(0, 5, 0))
+                            assertThat(row.getArray(26).getDecimal(0, 5, 0))
                                     
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(v), 5, 0));
-                            assertThat(row.getArray(27).getDecimal(0, 5, 0))
+                            assertThat(row.getArray(29).getDecimal(0, 5, 0))
                                     
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(v), 5, 0));
                         }
-                        assertThat(row.getDecimal(10, 15, 2))
+                        assertThat(row.getDecimal(12, 15, 2))
                                 
.isEqualTo(Decimal.fromUnscaledLong(v.longValue(), 15, 2));
-                        assertThat(row.getDecimal(11, 20, 0).toBigDecimal())
+                        assertThat(row.getDecimal(13, 20, 0).toBigDecimal())
                                 .isEqualTo(BigDecimal.valueOf(v));
-                        assertThat(row.getDecimal(13, 15, 0).toBigDecimal())
+                        assertThat(row.getDecimal(15, 15, 0).toBigDecimal())
                                 .isEqualTo(BigDecimal.valueOf(v));
-                        assertThat(row.getDecimal(14, 20, 0).toBigDecimal())
+                        assertThat(row.getDecimal(16, 20, 0).toBigDecimal())
                                 .isEqualTo(BigDecimal.valueOf(v));
-                        
assertThat(row.getArray(15).getString(0)).hasToString("" + v);
-                        assertThat(row.getArray(16).getBoolean(0)).isEqualTo(v 
% 2 == 0);
-                        
assertThat(row.getArray(17).getByte(0)).isEqualTo(v.byteValue());
-                        
assertThat(row.getArray(18).getShort(0)).isEqualTo(v.shortValue());
-                        
assertThat(row.getArray(19).getInt(0)).isEqualTo(v.intValue());
-                        
assertThat(row.getArray(20).getLong(0)).isEqualTo(v.longValue());
-                        
assertThat(row.getArray(21).getFloat(0)).isEqualTo(v.floatValue());
-                        
assertThat(row.getArray(22).getDouble(0)).isEqualTo(v.doubleValue());
-                        assertThat(row.getArray(23).getTimestamp(0, 
9).toLocalDateTime())
+                        
assertThat(row.getArray(17).getString(0)).hasToString("" + v);
+                        assertThat(row.getArray(18).getBoolean(0)).isEqualTo(v 
% 2 == 0);
+                        
assertThat(row.getArray(19).getByte(0)).isEqualTo(v.byteValue());
+                        
assertThat(row.getArray(20).getShort(0)).isEqualTo(v.shortValue());
+                        
assertThat(row.getArray(21).getInt(0)).isEqualTo(v.intValue());
+                        
assertThat(row.getArray(22).getLong(0)).isEqualTo(v.longValue());
+                        
assertThat(row.getArray(23).getFloat(0)).isEqualTo(v.floatValue());
+                        
assertThat(row.getArray(24).getDouble(0)).isEqualTo(v.doubleValue());
+                        assertThat(row.getArray(25).getTimestamp(0, 
9).toLocalDateTime())
                                 .isEqualTo(toDateTime(v));
 
-                        assertThat(row.getArray(25).getDecimal(0, 15, 0))
+                        assertThat(row.getArray(27).getDecimal(0, 15, 0))
                                 
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(v), 15, 0));
-                        assertThat(row.getArray(26).getDecimal(0, 20, 0))
+                        assertThat(row.getArray(28).getDecimal(0, 20, 0))
                                 
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(v), 20, 0));
-                        assertThat(row.getArray(28).getDecimal(0, 15, 0))
+                        assertThat(row.getArray(30).getDecimal(0, 15, 0))
                                 
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(v), 15, 0));
-                        assertThat(row.getArray(29).getDecimal(0, 20, 0))
+                        assertThat(row.getArray(31).getDecimal(0, 20, 0))
                                 
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(v), 20, 0));
-                        
assertThat(row.getMap(30).valueArray().getString(0)).hasToString("" + v);
-                        
assertThat(row.getMap(31).valueArray().getBoolean(0)).isEqualTo(v % 2 == 0);
-                        
assertThat(row.getMap(32).keyArray().getString(0)).hasToString("" + v);
-                        assertThat(row.getRow(33, 
2).getString(0)).hasToString("" + v);
-                        assertThat(row.getRow(33, 
2).getInt(1)).isEqualTo(v.intValue());
+                        
assertThat(row.getMap(32).valueArray().getString(0)).hasToString("" + v);
+                        
assertThat(row.getMap(33).valueArray().getBoolean(0)).isEqualTo(v % 2 == 0);
+                        
assertThat(row.getMap(34).keyArray().getString(0)).hasToString("" + v);
+                        assertThat(row.getRow(35, 
2).getString(0)).hasToString("" + v);
+                        assertThat(row.getRow(35, 
2).getInt(1)).isEqualTo(v.intValue());
                     }
                     cnt.incrementAndGet();
                 });
@@ -450,7 +455,9 @@ public class ParquetReadWriteTest {
                 v.longValue(),
                 v.floatValue(),
                 v.doubleValue(),
-                Timestamp.fromLocalDateTime(toDateTime(v)),
+                toMills(v),
+                toMicros(v),
+                toNanos(v),
                 Decimal.fromBigDecimal(BigDecimal.valueOf(v), 5, 0),
                 Decimal.fromUnscaledLong(v.longValue(), 15, 2),
                 Decimal.fromBigDecimal(BigDecimal.valueOf(v), 20, 0),
@@ -465,7 +472,7 @@ public class ParquetReadWriteTest {
                 new GenericArray(new Object[] {v.longValue(), null}),
                 new GenericArray(new Object[] {v.floatValue(), null}),
                 new GenericArray(new Object[] {v.doubleValue(), null}),
-                new GenericArray(new Object[] 
{Timestamp.fromLocalDateTime(toDateTime(v)), null}),
+                new GenericArray(new Object[] {toNanos(v), null}),
                 Decimal.fromBigDecimal(BigDecimal.valueOf(v), 5, 0) == null
                         ? null
                         : new GenericArray(
@@ -492,6 +499,19 @@ public class ParquetReadWriteTest {
                 GenericRow.of(BinaryString.fromString("" + v), v));
     }
 
+    private Timestamp toMills(Integer v) {
+        return Timestamp.fromEpochMillis(
+                Timestamp.fromLocalDateTime(toDateTime(v)).getMillisecond());
+    }
+
+    private Timestamp toMicros(Integer v) {
+        return 
Timestamp.fromMicros(Timestamp.fromLocalDateTime(toDateTime(v)).toMicros());
+    }
+
+    private Timestamp toNanos(Integer v) {
+        return Timestamp.fromLocalDateTime(toDateTime(v));
+    }
+
     private LocalDateTime toDateTime(Integer v) {
         v = (v > 0 ? v : -v) % 10000;
         return BASE_TIME.plusNanos(v).plusSeconds(v);

Reply via email to