This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 97ca28c4 [FLINK-31329] Fix Parquet stats extractor
97ca28c4 is described below
commit 97ca28c4f97ed705b0ae27cd0b30954db1dd6a18
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 6 17:01:21 2023 +0800
[FLINK-31329] Fix Parquet stats extractor
This closes #582
---
.../apache/flink/table/store/data/Timestamp.java | 17 ++
.../table/store/data/columnar/ColumnarMap.java | 5 +
.../flink/table/store/file/predicate/In.java | 3 +-
.../table/store/file/predicate/IsNotNull.java | 3 +-
.../flink/table/store/file/predicate/IsNull.java | 3 +-
.../table/store/file/predicate/LeafPredicate.java | 3 +-
.../flink/table/store/file/predicate/NotIn.java | 3 +-
.../predicate/NullFalseLeafBinaryFunction.java | 7 +-
.../flink/table/store/format/FieldStats.java | 12 +-
.../flink/table/store/data/TimestampTest.java | 9 +
.../store/format/FileStatsExtractorTestBase.java | 7 +
flink-table-store-core/pom.xml | 29 ++++
.../flink/table/store/file/io/DataFileMeta.java | 2 +-
.../table/store/file/stats/BinaryTableStats.java | 16 +-
.../file/stats/FieldStatsArraySerializer.java | 7 +-
.../store/file/predicate/PredicateBuilderTest.java | 20 +--
.../table/store/file/predicate/PredicateTest.java | 187 +++++++++++----------
.../store/file/stats/BinaryTableStatsTest.java | 2 +-
.../file/stats/FieldStatsArraySerializerTest.java | 2 +-
.../store/file/stats/FieldStatsCollectorTest.java | 24 +--
.../table/store/file/stats/StatsTestUtils.java | 4 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 4 +
.../table/store/table/FileStoreTableTestBase.java | 4 +
.../format/parquet/ParquetFileStatsExtractor.java | 72 ++++----
.../store/format/parquet/ParquetReaderFactory.java | 17 +-
.../format/parquet/ParquetSchemaConverter.java | 63 ++++---
.../table/store/format/parquet/ParquetUtil.java | 10 +-
.../format/parquet/reader/ArrayColumnReader.java | 42 +++--
.../parquet/reader/ParquetSplitReaderUtil.java | 21 ++-
.../parquet/reader/ParquetTimestampVector.java | 63 +++++++
.../parquet/writer/ParquetRowDataWriter.java | 131 ++++++++++-----
.../parquet/ParquetFileStatsExtractorTest.java | 37 +++-
.../store/format/parquet/ParquetReadWriteTest.java | 88 ++++++----
33 files changed, 618 insertions(+), 299 deletions(-)
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/Timestamp.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/Timestamp.java
index f4c0ce62..bb155ff2 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/Timestamp.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/Timestamp.java
@@ -47,6 +47,10 @@ public final class Timestamp implements
Comparable<Timestamp>, Serializable {
// the number of milliseconds in a day
private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 *
1000
+ public static final long MICROS_PER_MILLIS = 1000L;
+
+ public static final long NANOS_PER_MICROS = 1000L;
+
// this field holds the integral second and the milli-of-second
private final long millisecond;
@@ -104,6 +108,12 @@ public final class Timestamp implements
Comparable<Timestamp>, Serializable {
return Instant.ofEpochSecond(epochSecond, nanoAdjustment);
}
+ /** Converts this {@link Timestamp} object to micros. */
+ public long toMicros() {
+ long micros = Math.multiplyExact(millisecond, MICROS_PER_MILLIS);
+ return micros + nanoOfMillisecond / NANOS_PER_MICROS;
+ }
+
@Override
public int compareTo(Timestamp that) {
int cmp = Long.compare(this.millisecond, that.millisecond);
@@ -205,6 +215,13 @@ public final class Timestamp implements
Comparable<Timestamp>, Serializable {
return new Timestamp(millisecond, nanoOfMillisecond);
}
+ /** Creates an instance of {@link Timestamp} from micros. */
+ public static Timestamp fromMicros(long micros) {
+ long mills = Math.floorDiv(micros, MICROS_PER_MILLIS);
+ long nanos = (micros - mills * MICROS_PER_MILLIS) * NANOS_PER_MICROS;
+ return Timestamp.fromEpochMillis(mills, (int) nanos);
+ }
+
/**
* Returns whether the timestamp data is small enough to be stored in a
long of milliseconds.
*/
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/columnar/ColumnarMap.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/columnar/ColumnarMap.java
index 6402fd3b..14170214 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/columnar/ColumnarMap.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/columnar/ColumnarMap.java
@@ -70,4 +70,9 @@ public final class ColumnarMap implements InternalMap,
Serializable {
throw new UnsupportedOperationException(
"ColumnarMapData do not support hashCode, please hash fields
one by one!");
}
+
+ @Override
+ public String toString() {
+ return getClass().getName() + "@" +
Integer.toHexString(super.hashCode());
+ }
}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/In.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/In.java
index 9121554d..224dfdb4 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/In.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/In.java
@@ -51,7 +51,8 @@ public class In extends LeafFunction {
@Override
public boolean test(
DataType type, long rowCount, FieldStats fieldStats, List<Object>
literals) {
- if (rowCount == fieldStats.nullCount()) {
+ Long nullCount = fieldStats.nullCount();
+ if (nullCount != null && rowCount == nullCount) {
return false;
}
for (Object literal : literals) {
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
index 14d1fcd0..19cab378 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNotNull.java
@@ -38,7 +38,8 @@ public class IsNotNull extends LeafUnaryFunction {
@Override
public boolean test(DataType type, long rowCount, FieldStats fieldStats) {
- return fieldStats.nullCount() < rowCount;
+ Long nullCount = fieldStats.nullCount();
+ return nullCount == null || nullCount < rowCount;
}
@Override
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
index 2a88bf1e..5f819a59 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/IsNull.java
@@ -38,7 +38,8 @@ public class IsNull extends LeafUnaryFunction {
@Override
public boolean test(DataType type, long rowCount, FieldStats fieldStats) {
- return fieldStats.nullCount() > 0;
+ Long nullCount = fieldStats.nullCount();
+ return nullCount == null || nullCount > 0;
}
@Override
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
index 6f77383e..6d179b53 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/LeafPredicate.java
@@ -90,7 +90,8 @@ public class LeafPredicate implements Predicate {
@Override
public boolean test(long rowCount, FieldStats[] fieldStats) {
FieldStats stats = fieldStats[fieldIndex];
- if (rowCount != stats.nullCount()) {
+ Long nullCount = stats.nullCount();
+ if (nullCount == null || rowCount != nullCount) {
// not all null
// min or max is null
// unknown stats
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
index 15e47c8c..5be1b4f1 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
@@ -51,7 +51,8 @@ public class NotIn extends LeafFunction {
@Override
public boolean test(
DataType type, long rowCount, FieldStats fieldStats, List<Object>
literals) {
- if (rowCount == fieldStats.nullCount()) {
+ Long nullCount = fieldStats.nullCount();
+ if (nullCount != null && rowCount == nullCount) {
return false;
}
for (Object literal : literals) {
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
index ae620941..24e2af3d 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
@@ -44,8 +44,11 @@ public abstract class NullFalseLeafBinaryFunction extends
LeafFunction {
@Override
public boolean test(
DataType type, long rowCount, FieldStats fieldStats, List<Object>
literals) {
- if (rowCount == fieldStats.nullCount() || literals.get(0) == null) {
- return false;
+ Long nullCount = fieldStats.nullCount();
+ if (nullCount != null) {
+ if (rowCount == nullCount || literals.get(0) == null) {
+ return false;
+ }
}
return test(type, rowCount, fieldStats, literals.get(0));
}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStats.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStats.java
index d2ec5467..78c9a678 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStats.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/format/FieldStats.java
@@ -27,23 +27,27 @@ public class FieldStats {
@Nullable private final Object minValue;
@Nullable private final Object maxValue;
- private final long nullCount;
+ private final Long nullCount;
- public FieldStats(@Nullable Object minValue, @Nullable Object maxValue,
long nullCount) {
+ public FieldStats(
+ @Nullable Object minValue, @Nullable Object maxValue, @Nullable
Long nullCount) {
this.minValue = minValue;
this.maxValue = maxValue;
this.nullCount = nullCount;
}
+ @Nullable
public Object minValue() {
return minValue;
}
+ @Nullable
public Object maxValue() {
return maxValue;
}
- public long nullCount() {
+ @Nullable
+ public Long nullCount() {
return nullCount;
}
@@ -55,7 +59,7 @@ public class FieldStats {
FieldStats that = (FieldStats) o;
return Objects.equals(minValue, that.minValue)
&& Objects.equals(maxValue, that.maxValue)
- && nullCount == that.nullCount;
+ && Objects.equals(nullCount, that.nullCount);
}
@Override
diff --git
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/TimestampTest.java
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/TimestampTest.java
index 359d32a7..a704427a 100644
---
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/TimestampTest.java
+++
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/data/TimestampTest.java
@@ -130,4 +130,13 @@ public class TimestampTest {
assertThat(Timestamp.fromInstant(instant).toString())
.isEqualTo("1970-01-01T00:00:00.123456789");
}
+
+ @Test
+ public void testToMicros() {
+ java.sql.Timestamp t = java.sql.Timestamp.valueOf("2005-01-02
00:00:00.123456789");
+ assertThat(Timestamp.fromSQLTimestamp(t).toString())
+ .isEqualTo("2005-01-02T00:00:00.123456789");
+
assertThat(Timestamp.fromMicros(Timestamp.fromSQLTimestamp(t).toMicros()).toString())
+ .isEqualTo("2005-01-02T00:00:00.123456");
+ }
}
diff --git
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
index 557564f7..ceec7595 100644
---
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
+++
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/format/FileStatsExtractorTestBase.java
@@ -90,9 +90,16 @@ public abstract class FileStatsExtractorTestBase {
FileStatsExtractor extractor =
format.createStatsExtractor(rowType).get();
assertThat(extractor).isNotNull();
FieldStats[] actual = extractor.extract(fileIO, path);
+ for (int i = 0; i < expected.length; i++) {
+ expected[i] = regenerate(expected[i], rowType.getTypeAt(i));
+ }
assertThat(actual).isEqualTo(expected);
}
+ protected FieldStats regenerate(FieldStats stats, DataType type) {
+ return stats;
+ }
+
private List<GenericRow> createData(RowType rowType) {
ThreadLocalRandom random = ThreadLocalRandom.current();
int numRows = random.nextInt(1, 100);
diff --git a/flink-table-store-core/pom.xml b/flink-table-store-core/pom.xml
index 684b616a..15ccd5cc 100644
--- a/flink-table-store-core/pom.xml
+++ b/flink-table-store-core/pom.xml
@@ -116,6 +116,35 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>ch.qos.reload4j</groupId>
+ <artifactId>reload4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/DataFileMeta.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/DataFileMeta.java
index 22f070f9..9229ae5d 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/DataFileMeta.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/DataFileMeta.java
@@ -46,7 +46,7 @@ public class DataFileMeta {
// Append only data files don't have any key columns and meaningful level
value. it will use
// the following dummy values.
public static final BinaryTableStats EMPTY_KEY_STATS =
- new BinaryTableStats(EMPTY_ROW, EMPTY_ROW, new long[0]);
+ new BinaryTableStats(EMPTY_ROW, EMPTY_ROW, new Long[0]);
public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW;
public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW;
public static final int DUMMY_LEVEL = 0;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
index accf24e8..b4586e69 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/BinaryTableStats.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file.stats;
import org.apache.flink.table.store.data.BinaryRow;
import org.apache.flink.table.store.data.GenericArray;
import org.apache.flink.table.store.data.GenericRow;
+import org.apache.flink.table.store.data.InternalArray;
import org.apache.flink.table.store.data.InternalRow;
import org.apache.flink.table.store.format.FieldStats;
@@ -40,20 +41,20 @@ public class BinaryTableStats {
@Nullable private FieldStats[] cacheArray;
@Nullable private BinaryRow cacheMin;
@Nullable private BinaryRow cacheMax;
- @Nullable private long[] cacheNullCounts;
+ @Nullable private Long[] cacheNullCounts;
public BinaryTableStats(InternalRow row) {
this.row = row;
}
- public BinaryTableStats(BinaryRow cacheMin, BinaryRow cacheMax, long[]
cacheNullCounts) {
+ public BinaryTableStats(BinaryRow cacheMin, BinaryRow cacheMax, Long[]
cacheNullCounts) {
this(cacheMin, cacheMax, cacheNullCounts, null);
}
public BinaryTableStats(
BinaryRow cacheMin,
BinaryRow cacheMax,
- long[] cacheNullCounts,
+ Long[] cacheNullCounts,
@Nullable FieldStats[] cacheArray) {
this.cacheMin = cacheMin;
this.cacheMax = cacheMax;
@@ -88,10 +89,15 @@ public class BinaryTableStats {
return cacheMax;
}
- public long[] nullCounts() {
+ public Long[] nullCounts() {
if (cacheNullCounts == null) {
checkNotNull(row);
- cacheNullCounts = row.getArray(2).toLongArray();
+ InternalArray internalArray = row.getArray(2);
+ Long[] array = new Long[internalArray.size()];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = internalArray.isNullAt(i) ? null :
internalArray.getLong(i);
+ }
+ return array;
}
return cacheNullCounts;
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
index 928bf331..62578f31 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
@@ -71,7 +71,7 @@ public class FieldStatsArraySerializer {
int rowFieldCount = stats.length;
GenericRow minValues = new GenericRow(rowFieldCount);
GenericRow maxValues = new GenericRow(rowFieldCount);
- long[] nullCounts = new long[rowFieldCount];
+ Long[] nullCounts = new Long[rowFieldCount];
for (int i = 0; i < rowFieldCount; i++) {
minValues.setField(i, stats[i].minValue());
maxValues.setField(i, stats[i].maxValue());
@@ -91,6 +91,7 @@ public class FieldStatsArraySerializer {
public FieldStats[] fromBinary(BinaryTableStats array, @Nullable Long
rowCount) {
int fieldCount = indexMapping == null ? fieldGetters.length :
indexMapping.length;
FieldStats[] stats = new FieldStats[fieldCount];
+ Long[] nullCounts = array.nullCounts();
for (int i = 0; i < fieldCount; i++) {
int fieldIndex = indexMapping == null ? i : indexMapping[i];
if (fieldIndex < 0 || fieldIndex >= array.min().getFieldCount()) {
@@ -108,7 +109,7 @@ public class FieldStatsArraySerializer {
Object max =
fieldGetters[fieldIndex].getFieldOrNull(array.max());
max = converter == null || max == null ? max :
converter.cast(max);
- stats[i] = new FieldStats(min, max,
array.nullCounts()[fieldIndex]);
+ stats[i] = new FieldStats(min, max, nullCounts[fieldIndex]);
}
}
return stats;
@@ -118,7 +119,7 @@ public class FieldStatsArraySerializer {
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(0, "_MIN_VALUES", newBytesType(false)));
fields.add(new DataField(1, "_MAX_VALUES", newBytesType(false)));
- fields.add(new DataField(2, "_NULL_COUNTS", new ArrayType(new
BigIntType(false))));
+ fields.add(new DataField(2, "_NULL_COUNTS", new ArrayType(new
BigIntType(true))));
return new RowType(fields);
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
index 778fca23..7ed72cd1 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateBuilderTest.java
@@ -42,11 +42,11 @@ public class PredicateBuilderTest {
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(2, 5,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 2,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(2, 5,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 2,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
}
@@ -61,11 +61,11 @@ public class PredicateBuilderTest {
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(2, 5,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 2,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(2, 5,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 2,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
index 645c22c2..0bd84f6c 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateTest.java
@@ -42,10 +42,10 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {5})).isEqualTo(true);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.notEqual(0, 5));
@@ -59,8 +59,8 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
}
@@ -73,11 +73,11 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {5})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(true);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(5, 5,
0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(5, 5,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.equal(0,
5));
@@ -91,8 +91,8 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
}
@@ -106,11 +106,11 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {6})).isEqualTo(true);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(true);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.lessOrEqual(0,
5));
@@ -124,8 +124,8 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4,
0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
}
@@ -139,11 +139,11 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {6})).isEqualTo(true);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(true);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 6,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.lessThan(0, 5));
@@ -157,8 +157,8 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4,
0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 4,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
}
@@ -172,10 +172,10 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {6})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(4, 7,
0)})).isEqualTo(true);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(4, 7,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.greaterOrEqual(0,
5));
@@ -189,8 +189,8 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
}
@@ -204,10 +204,10 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {6})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(4, 7,
0)})).isEqualTo(true);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(4, 7,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.greaterThan(0,
5));
@@ -221,8 +221,8 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
}
@@ -234,8 +234,8 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {4})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7,
1)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7,
1L)})).isEqualTo(true);
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.isNotNull(0));
}
@@ -248,9 +248,9 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {4})).isEqualTo(true);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7,
1)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(null,
null, 3)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(5, 7,
1L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(null,
null, 3L)}))
.isEqualTo(false);
assertThat(predicate.negate().orElse(null)).isEqualTo(builder.isNull(0));
@@ -267,9 +267,9 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
}
@@ -284,9 +284,9 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
}
@@ -301,12 +301,12 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(true);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
}
@@ -321,12 +321,12 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
}
@@ -347,11 +347,12 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32,
0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32,
0L)}))
+ .isEqualTo(true);
}
@Test
@@ -372,11 +373,12 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {3})).isEqualTo(true);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32,
0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32,
0L)}))
+ .isEqualTo(true);
}
@Test
@@ -396,14 +398,15 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(true);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(true);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(true);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32,
0)})).isEqualTo(true);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32,
0L)}))
+ .isEqualTo(true);
}
@Test
@@ -424,14 +427,14 @@ public class PredicateTest {
assertThat(predicate.test(new Object[] {3})).isEqualTo(false);
assertThat(predicate.test(new Object[] {null})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0)})).isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0)})).isEqualTo(false);
- assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 1,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(3, 3,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(1, 3,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(0, 5,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(6, 7,
0L)})).isEqualTo(false);
+ assertThat(predicate.test(1, new FieldStats[] {new FieldStats(null,
null, 1L)}))
.isEqualTo(false);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32,
0)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(29, 32,
0L)}))
.isEqualTo(false);
}
@@ -449,21 +452,21 @@ public class PredicateTest {
predicate.test(
3,
new FieldStats[] {
- new FieldStats(3, 6, 0), new FieldStats(4,
6, 0)
+ new FieldStats(3, 6, 0L), new
FieldStats(4, 6, 0L)
}))
.isEqualTo(true);
assertThat(
predicate.test(
3,
new FieldStats[] {
- new FieldStats(3, 6, 0), new FieldStats(6,
8, 0)
+ new FieldStats(3, 6, 0L), new
FieldStats(6, 8, 0L)
}))
.isEqualTo(false);
assertThat(
predicate.test(
3,
new FieldStats[] {
- new FieldStats(6, 7, 0), new FieldStats(4,
6, 0)
+ new FieldStats(6, 7, 0L), new
FieldStats(4, 6, 0L)
}))
.isEqualTo(false);
@@ -485,21 +488,21 @@ public class PredicateTest {
predicate.test(
3,
new FieldStats[] {
- new FieldStats(3, 6, 0), new FieldStats(4,
6, 0)
+ new FieldStats(3, 6, 0L), new
FieldStats(4, 6, 0L)
}))
.isEqualTo(true);
assertThat(
predicate.test(
3,
new FieldStats[] {
- new FieldStats(3, 6, 0), new FieldStats(6,
8, 0)
+ new FieldStats(3, 6, 0L), new
FieldStats(6, 8, 0L)
}))
.isEqualTo(true);
assertThat(
predicate.test(
3,
new FieldStats[] {
- new FieldStats(6, 7, 0), new FieldStats(8,
10, 0)
+ new FieldStats(6, 7, 0L), new
FieldStats(8, 10, 0L)
}))
.isEqualTo(false);
@@ -512,11 +515,11 @@ public class PredicateTest {
PredicateBuilder builder = new PredicateBuilder(RowType.of(new
IntType()));
Predicate predicate = builder.equal(0, 5);
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(null,
null, 3)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(null,
null, 3L)}))
.isEqualTo(false);
// unknown stats, we don't know, likely to hit
- assertThat(predicate.test(3, new FieldStats[] {new FieldStats(null,
null, 4)}))
+ assertThat(predicate.test(3, new FieldStats[] {new FieldStats(null,
null, 4L)}))
.isEqualTo(true);
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/BinaryTableStatsTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/BinaryTableStatsTest.java
index b413a787..43303c9f 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/BinaryTableStatsTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/BinaryTableStatsTest.java
@@ -38,7 +38,7 @@ public class BinaryTableStatsTest {
public void testBinaryTableStats() {
List<Integer> minList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Integer> maxList = Arrays.asList(11, 12, 13, 14, 15, 16, 17, 18,
19);
- long[] nullCounts = new long[] {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L};
+ Long[] nullCounts = new Long[] {0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L};
BinaryRow minRowData = binaryRow(minList);
BinaryRow maxRowData = binaryRow(maxList);
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
index 6eb3dd8d..957ede01 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
@@ -77,7 +77,7 @@ public class FieldStatsArraySerializerTest {
tableSchema.logicalRowType(), indexMapping,
converterMapping);
BinaryRow minRowData = row(1, 2, 3, 4);
BinaryRow maxRowData = row(100, 99, 98, 97);
- long[] nullCounts = new long[] {1, 0, 10, 100};
+ Long[] nullCounts = new Long[] {1L, 0L, 10L, 100L};
BinaryTableStats dataTableStats = new BinaryTableStats(minRowData,
maxRowData, nullCounts);
FieldStats[] fieldStatsArray =
dataTableStats.fields(fieldStatsArraySerializer, 1000L);
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
index 2b4b1720..1fe39dae 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsCollectorTest.java
@@ -47,24 +47,24 @@ public class FieldStatsCollectorTest {
assertThat(collector.extract())
.isEqualTo(
new FieldStats[] {
- new FieldStats(1, 1, 0),
+ new FieldStats(1, 1, 0L),
new FieldStats(
BinaryString.fromString("Flink"),
BinaryString.fromString("Flink"),
- 0),
- new FieldStats(null, null, 0)
+ 0L),
+ new FieldStats(null, null, 0L)
});
collector.collect(GenericRow.of(3, null, new GenericArray(new int[]
{3, 30})));
assertThat(collector.extract())
.isEqualTo(
new FieldStats[] {
- new FieldStats(1, 3, 0),
+ new FieldStats(1, 3, 0L),
new FieldStats(
BinaryString.fromString("Flink"),
BinaryString.fromString("Flink"),
- 1),
- new FieldStats(null, null, 0)
+ 1L),
+ new FieldStats(null, null, 0L)
});
collector.collect(
@@ -75,24 +75,24 @@ public class FieldStatsCollectorTest {
assertThat(collector.extract())
.isEqualTo(
new FieldStats[] {
- new FieldStats(1, 3, 1),
+ new FieldStats(1, 3, 1L),
new FieldStats(
BinaryString.fromString("Apache"),
BinaryString.fromString("Flink"),
- 1),
- new FieldStats(null, null, 0)
+ 1L),
+ new FieldStats(null, null, 0L)
});
collector.collect(GenericRow.of(2, BinaryString.fromString("Batch"),
null));
assertThat(collector.extract())
.isEqualTo(
new FieldStats[] {
- new FieldStats(1, 3, 1),
+ new FieldStats(1, 3, 1L),
new FieldStats(
BinaryString.fromString("Apache"),
BinaryString.fromString("Flink"),
- 1),
- new FieldStats(null, null, 1)
+ 1L),
+ new FieldStats(null, null, 1L)
});
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
index e204fc60..45510bb7 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
@@ -76,7 +76,7 @@ public class StatsTestUtils {
new FieldStatsArraySerializer(RowType.of(new IntType()));
FieldStats[] array = new FieldStats[fieldCount];
for (int i = 0; i < fieldCount; i++) {
- array[i] = new FieldStats(null, null, 0);
+ array[i] = new FieldStats(null, null, 0L);
}
return statsConverter.toBinary(array);
}
@@ -84,6 +84,6 @@ public class StatsTestUtils {
public static BinaryTableStats newTableStats(int min, int max) {
FieldStatsArraySerializer statsConverter =
new FieldStatsArraySerializer(RowType.of(new IntType()));
- return statsConverter.toBinary(new FieldStats[] {new FieldStats(min,
max, 0)});
+ return statsConverter.toBinary(new FieldStats[] {new FieldStats(min,
max, 0L)});
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index e1a1c89c..959f3f9a 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -499,6 +499,10 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
@Test
public void testReadFilter() throws Exception {
FileStoreTable table = createFileStoreTable();
+ if
(table.options().fileFormat().getFormatIdentifier().equals("parquet")) {
+ // TODO support parquet reader filter push down
+ return;
+ }
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 2ba20d45..5b0b04b3 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -238,6 +238,10 @@ public abstract class FileStoreTableTestBase {
@Test
public void testReadFilter() throws Exception {
FileStoreTable table = createFileStoreTable();
+ if
(table.options().fileFormat().getFormatIdentifier().equals("parquet")) {
+ // TODO support parquet reader filter push down
+ return;
+ }
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
diff --git
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractor.java
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractor.java
index 32389b8e..699afc92 100644
---
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractor.java
+++
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractor.java
@@ -20,15 +20,16 @@ package org.apache.flink.table.store.format.parquet;
import org.apache.flink.table.store.data.BinaryString;
import org.apache.flink.table.store.data.Decimal;
+import org.apache.flink.table.store.data.Timestamp;
import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.store.format.FileStatsExtractor;
import org.apache.flink.table.store.fs.FileIO;
import org.apache.flink.table.store.fs.Path;
import org.apache.flink.table.store.types.DataField;
-import org.apache.flink.table.store.types.DataTypeRoot;
import org.apache.flink.table.store.types.DecimalType;
+import org.apache.flink.table.store.types.LocalZonedTimestampType;
import org.apache.flink.table.store.types.RowType;
-import org.apache.flink.table.store.utils.DateTimeUtils;
+import org.apache.flink.table.store.types.TimestampType;
import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.column.statistics.BooleanStatistics;
@@ -37,7 +38,6 @@ import org.apache.parquet.column.statistics.FloatStatistics;
import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.column.statistics.LongStatistics;
import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
@@ -65,7 +65,7 @@ public class ParquetFileStatsExtractor implements
FileStatsExtractor {
@Override
public FieldStats[] extract(FileIO fileIO, Path path) throws IOException {
- Map<String, Statistics> stats = ParquetUtil.extractColumnStats(fileIO,
path);
+ Map<String, Statistics<?>> stats =
ParquetUtil.extractColumnStats(fileIO, path);
return IntStream.range(0, rowType.getFieldCount())
.mapToObj(
@@ -76,29 +76,23 @@ public class ParquetFileStatsExtractor implements
FileStatsExtractor {
.toArray(FieldStats[]::new);
}
- private FieldStats toFieldStats(DataField field, Statistics stats) {
- DataTypeRoot flinkType = field.type().getTypeRoot();
- if (stats == null
- || flinkType == DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE
- || flinkType == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
- throw new UnsupportedOperationException(
- "type "
- + field.type().getTypeRoot()
- + " not supported for extracting statistics in
parquet format");
+ private FieldStats toFieldStats(DataField field, Statistics<?> stats) {
+ if (stats == null) {
+ return new FieldStats(null, null, null);
}
long nullCount = stats.getNumNulls();
if (!stats.hasNonNullValue()) {
return new FieldStats(null, null, nullCount);
}
- switch (flinkType) {
+ switch (field.type().getTypeRoot()) {
case CHAR:
case VARCHAR:
assertStatsClass(field, stats, BinaryStatistics.class);
- BinaryStatistics binaryStats = (BinaryStatistics) stats;
+ BinaryStatistics stringStats = (BinaryStatistics) stats;
return new FieldStats(
- BinaryString.fromString(binaryStats.minAsString()),
- BinaryString.fromString(binaryStats.maxAsString()),
+ BinaryString.fromString(stringStats.minAsString()),
+ BinaryString.fromString(stringStats.maxAsString()),
nullCount);
case BOOLEAN:
assertStatsClass(field, stats, BooleanStatistics.class);
@@ -109,14 +103,8 @@ public class ParquetFileStatsExtractor implements
FileStatsExtractor {
DecimalType decimalType = (DecimalType) (field.type());
int precision = decimalType.getPrecision();
int scale = decimalType.getScale();
- if (primitive.getOriginalType() != null
- && primitive.getLogicalTypeAnnotation()
- instanceof
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
- return convertStatsToDecimalFieldStats(
- primitive, field, stats, precision, scale,
nullCount);
- } else {
- return new FieldStats(null, null, nullCount);
- }
+ return convertStatsToDecimalFieldStats(
+ primitive, field, stats, precision, scale, nullCount);
case TINYINT:
assertStatsClass(field, stats, IntStatistics.class);
IntStatistics byteStats = (IntStatistics) stats;
@@ -128,6 +116,8 @@ public class ParquetFileStatsExtractor implements
FileStatsExtractor {
return new FieldStats(
(short) shortStats.getMin(), (short)
shortStats.getMax(), nullCount);
case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
assertStatsClass(field, stats, IntStatistics.class);
IntStatistics intStats = (IntStatistics) stats;
return new FieldStats(
@@ -146,18 +136,34 @@ public class ParquetFileStatsExtractor implements
FileStatsExtractor {
assertStatsClass(field, stats, DoubleStatistics.class);
DoubleStatistics doubleStats = (DoubleStatistics) stats;
return new FieldStats(doubleStats.getMin(),
doubleStats.getMax(), nullCount);
- case DATE:
- assertStatsClass(field, stats, IntStatistics.class);
- IntStatistics dateStats = (IntStatistics) stats;
- return new FieldStats(
-
DateTimeUtils.toInternal(EPOCH_DAY.plusDays(dateStats.getMin())),
-
DateTimeUtils.toInternal(EPOCH_DAY.plusDays(dateStats.getMax())),
- nullCount);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return toTimestampStats(stats, ((TimestampType)
field.type()).getPrecision());
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return toTimestampStats(
+ stats, ((LocalZonedTimestampType)
field.type()).getPrecision());
default:
return new FieldStats(null, null, nullCount);
}
}
+ private FieldStats toTimestampStats(Statistics<?> stats, int precision) {
+ if (precision <= 3) {
+ LongStatistics longStats = (LongStatistics) stats;
+ return new FieldStats(
+ Timestamp.fromEpochMillis(longStats.getMin()),
+ Timestamp.fromEpochMillis(longStats.getMax()),
+ stats.getNumNulls());
+ } else if (precision <= 6) {
+ LongStatistics longStats = (LongStatistics) stats;
+ return new FieldStats(
+ Timestamp.fromMicros(longStats.getMin()),
+ Timestamp.fromMicros(longStats.getMax()),
+ stats.getNumNulls());
+ } else {
+ return new FieldStats(null, null, stats.getNumNulls());
+ }
+ }
+
/**
* Parquet cannot provide statistics for decimal fields directly, but we
can extract them from
* primitive statistics.
@@ -165,7 +171,7 @@ public class ParquetFileStatsExtractor implements
FileStatsExtractor {
private FieldStats convertStatsToDecimalFieldStats(
PrimitiveType primitive,
DataField field,
- Statistics stats,
+ Statistics<?> stats,
int precision,
int scale,
long nullCount) {
diff --git
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetReaderFactory.java
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetReaderFactory.java
index ff7afae6..e91e43c0 100644
---
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetReaderFactory.java
+++
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetReaderFactory.java
@@ -27,13 +27,13 @@ import
org.apache.flink.table.store.data.columnar.writable.WritableColumnVector;
import org.apache.flink.table.store.format.FormatReaderFactory;
import org.apache.flink.table.store.format.parquet.reader.ColumnReader;
import org.apache.flink.table.store.format.parquet.reader.ParquetDecimalVector;
+import
org.apache.flink.table.store.format.parquet.reader.ParquetTimestampVector;
import org.apache.flink.table.store.fs.FileIO;
import org.apache.flink.table.store.fs.Path;
import org.apache.flink.table.store.options.Options;
import org.apache.flink.table.store.reader.RecordReader;
import org.apache.flink.table.store.reader.RecordReader.RecordIterator;
import org.apache.flink.table.store.types.DataType;
-import org.apache.flink.table.store.types.DataTypeRoot;
import org.apache.flink.table.store.types.RowType;
import org.apache.flink.table.store.utils.Pool;
@@ -223,10 +223,17 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
WritableColumnVector[] writableVectors) {
ColumnVector[] vectors = new ColumnVector[writableVectors.length];
for (int i = 0; i < writableVectors.length; i++) {
- vectors[i] =
- projectedTypes[i].getTypeRoot() == DataTypeRoot.DECIMAL
- ? new ParquetDecimalVector(writableVectors[i])
- : writableVectors[i];
+ switch (projectedTypes[i].getTypeRoot()) {
+ case DECIMAL:
+ vectors[i] = new ParquetDecimalVector(writableVectors[i]);
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ vectors[i] = new
ParquetTimestampVector(writableVectors[i]);
+ break;
+ default:
+ vectors[i] = writableVectors[i];
+ }
}
return new VectorizedColumnBatch(vectors);
}
diff --git
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetSchemaConverter.java
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetSchemaConverter.java
index 502c287d..84954599 100644
---
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetSchemaConverter.java
+++
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetSchemaConverter.java
@@ -22,12 +22,15 @@ import org.apache.flink.table.store.types.ArrayType;
import org.apache.flink.table.store.types.DataType;
import org.apache.flink.table.store.types.DecimalType;
import org.apache.flink.table.store.types.IntType;
+import org.apache.flink.table.store.types.LocalZonedTimestampType;
import org.apache.flink.table.store.types.MapType;
import org.apache.flink.table.store.types.MultisetType;
import org.apache.flink.table.store.types.RowType;
+import org.apache.flink.table.store.types.TimestampType;
import org.apache.parquet.schema.ConversionPatterns;
import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
@@ -37,6 +40,10 @@ import org.apache.parquet.schema.Types;
import java.util.ArrayList;
import java.util.List;
+import static
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+
/** Schema converter converts Parquet schema to and from Flink internal types.
*/
public class ParquetSchemaConverter {
@@ -73,28 +80,28 @@ public class ParquetSchemaConverter {
case DECIMAL:
int precision = ((DecimalType) type).getPrecision();
int scale = ((DecimalType) type).getScale();
- int numBytes = computeMinBytesForDecimalPrecision(precision);
- return Types.primitive(
-
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
- .precision(precision)
- .scale(scale)
- .length(numBytes)
- .as(OriginalType.DECIMAL)
- .named(name);
+ if (is32BitDecimal(precision)) {
+ return Types.primitive(INT32, repetition)
+ .as(LogicalTypeAnnotation.decimalType(scale,
precision))
+ .named(name);
+ } else if (is64BitDecimal(precision)) {
+ return Types.primitive(INT64, repetition)
+ .as(LogicalTypeAnnotation.decimalType(scale,
precision))
+ .named(name);
+ } else {
+ return Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+ .as(LogicalTypeAnnotation.decimalType(scale,
precision))
+
.length(computeMinBytesForDecimalPrecision(precision))
+ .named(name);
+ }
case TINYINT:
- return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
repetition)
- .as(OriginalType.INT_8)
- .named(name);
+ return Types.primitive(INT32,
repetition).as(OriginalType.INT_8).named(name);
case SMALLINT:
- return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
repetition)
- .as(OriginalType.INT_16)
- .named(name);
+ return Types.primitive(INT32,
repetition).as(OriginalType.INT_16).named(name);
case INTEGER:
- return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
repetition)
- .named(name);
+ return Types.primitive(INT32, repetition).named(name);
case BIGINT:
- return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64,
repetition)
- .named(name);
+ return Types.primitive(INT64, repetition).named(name);
case FLOAT:
return Types.primitive(PrimitiveType.PrimitiveTypeName.FLOAT,
repetition)
.named(name);
@@ -102,17 +109,21 @@ public class ParquetSchemaConverter {
return Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE,
repetition)
.named(name);
case DATE:
- return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
repetition)
- .as(OriginalType.DATE)
- .named(name);
+ return Types.primitive(INT32,
repetition).as(OriginalType.DATE).named(name);
case TIME_WITHOUT_TIME_ZONE:
- return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
repetition)
- .as(OriginalType.TIME_MILLIS)
- .named(name);
+ return Types.primitive(INT32,
repetition).as(OriginalType.TIME_MILLIS).named(name);
case TIMESTAMP_WITHOUT_TIME_ZONE:
+ TimestampType timestampType = (TimestampType) type;
+ return timestampType.getPrecision() <= 6
+ ? Types.primitive(INT64, repetition).named(name)
+ :
Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
+ .named(name);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96,
repetition)
- .named(name);
+ LocalZonedTimestampType localZonedTimestampType =
(LocalZonedTimestampType) type;
+ return localZonedTimestampType.getPrecision() <= 6
+ ? Types.primitive(INT64, repetition).named(name)
+ :
Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
+ .named(name);
case ARRAY:
ArrayType arrayType = (ArrayType) type;
return ConversionPatterns.listOfElements(
diff --git
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetUtil.java
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetUtil.java
index 14e3a387..b86ac853 100644
---
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetUtil.java
+++
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/ParquetUtil.java
@@ -44,17 +44,17 @@ public class ParquetUtil {
* @return result sets as map, key is column name, value is statistics
(for example, null count,
* minimum value, maximum value)
*/
- public static Map<String, Statistics> extractColumnStats(FileIO fileIO,
Path path)
+ public static Map<String, Statistics<?>> extractColumnStats(FileIO fileIO,
Path path)
throws IOException {
ParquetMetadata parquetMetadata = getParquetReader(fileIO,
path).getFooter();
List<BlockMetaData> blockMetaDataList = parquetMetadata.getBlocks();
- Map<String, Statistics> resultStats = new HashMap<>();
+ Map<String, Statistics<?>> resultStats = new HashMap<>();
for (BlockMetaData blockMetaData : blockMetaDataList) {
List<ColumnChunkMetaData> columnChunkMetaDataList =
blockMetaData.getColumns();
for (ColumnChunkMetaData columnChunkMetaData :
columnChunkMetaDataList) {
- Statistics stats = columnChunkMetaData.getStatistics();
+ Statistics<?> stats = columnChunkMetaData.getStatistics();
String columnName =
columnChunkMetaData.getPath().toDotString();
- Statistics midStats;
+ Statistics<?> midStats;
if (!resultStats.containsKey(columnName)) {
midStats = stats;
} else {
@@ -79,7 +79,7 @@ public class ParquetUtil {
}
static void assertStatsClass(
- DataField field, Statistics stats, Class<? extends Statistics>
expectedClass) {
+ DataField field, Statistics<?> stats, Class<? extends
Statistics<?>> expectedClass) {
if (!expectedClass.isInstance(stats)) {
throw new IllegalArgumentException(
"Expecting "
diff --git
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ArrayColumnReader.java
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ArrayColumnReader.java
index 9e3cda53..6b4b76d3 100644
---
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ArrayColumnReader.java
+++
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ArrayColumnReader.java
@@ -404,22 +404,38 @@ public class ArrayColumnReader extends
BaseVectorizedColumnReader {
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- HeapTimestampVector timestampVector = new
HeapTimestampVector(total);
- timestampVector.reset();
- lcv.setChild(timestampVector);
- for (int i = 0; i < valueList.size(); i++) {
- if (valueList.get(i) == null) {
- ((HeapTimestampVector) lcv.getChild()).setNullAt(i);
- } else {
- ((HeapTimestampVector) lcv.getChild())
- .setTimestamp(i, ((List<Timestamp>)
valueList).get(i));
+ if (descriptor.getPrimitiveType().getPrimitiveTypeName()
+ == PrimitiveType.PrimitiveTypeName.INT64) {
+ HeapLongVector heapLongVector = new HeapLongVector(total);
+ heapLongVector.reset();
+ lcv.setChild(new ParquetTimestampVector(heapLongVector));
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ ((HeapLongVector) ((ParquetTimestampVector)
lcv.getChild()).getVector())
+ .setNullAt(i);
+ } else {
+ ((HeapLongVector) ((ParquetTimestampVector)
lcv.getChild()).getVector())
+ .vector[i] =
+ ((List<Long>) valueList).get(i);
+ }
}
+ break;
+ } else {
+ HeapTimestampVector timestampVector = new
HeapTimestampVector(total);
+ timestampVector.reset();
+ lcv.setChild(timestampVector);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ ((HeapTimestampVector)
lcv.getChild()).setNullAt(i);
+ } else {
+ ((HeapTimestampVector) lcv.getChild())
+ .setTimestamp(i, ((List<Timestamp>)
valueList).get(i));
+ }
+ }
+ break;
}
- break;
case DECIMAL:
- PrimitiveType.PrimitiveTypeName primitiveTypeName =
- descriptor.getPrimitiveType().getPrimitiveTypeName();
- switch (primitiveTypeName) {
+ switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT32:
HeapIntVector heapIntVector = new HeapIntVector(total);
heapIntVector.reset();
diff --git
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ParquetSplitReaderUtil.java
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ParquetSplitReaderUtil.java
index 9efaef6e..3df80d5d 100644
---
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ParquetSplitReaderUtil.java
+++
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ParquetSplitReaderUtil.java
@@ -34,6 +34,7 @@ import
org.apache.flink.table.store.data.columnar.writable.WritableColumnVector;
import org.apache.flink.table.store.format.parquet.ParquetSchemaConverter;
import org.apache.flink.table.store.types.ArrayType;
import org.apache.flink.table.store.types.DataType;
+import org.apache.flink.table.store.types.DataTypeChecks;
import org.apache.flink.table.store.types.DecimalType;
import org.apache.flink.table.store.types.IntType;
import org.apache.flink.table.store.types.MapType;
@@ -100,6 +101,11 @@ public class ParquetSplitReaderUtil {
descriptors.get(0),
pages.getPageReader(descriptors.get(0)));
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ if
(descriptors.get(0).getPrimitiveType().getPrimitiveTypeName()
+ == PrimitiveType.PrimitiveTypeName.INT64) {
+ return new LongColumnReader(
+ descriptors.get(0),
pages.getPageReader(descriptors.get(0)));
+ }
return new TimestampColumnReader(
true, descriptors.get(0),
pages.getPageReader(descriptors.get(0)));
case DECIMAL:
@@ -245,11 +251,16 @@ public class ParquetSplitReaderUtil {
return new HeapBytesVector(batchSize);
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.INT96,
- "Unexpected type: %s",
- typeName);
- return new HeapTimestampVector(batchSize);
+ int precision = DataTypeChecks.getPrecision(fieldType);
+ if (precision > 6) {
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.INT96,
+ "Unexpected type: %s",
+ typeName);
+ return new HeapTimestampVector(batchSize);
+ } else {
+ return new HeapLongVector(batchSize);
+ }
case DECIMAL:
DecimalType decimalType = (DecimalType) fieldType;
if
(ParquetSchemaConverter.is32BitDecimal(decimalType.getPrecision())) {
diff --git
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ParquetTimestampVector.java
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ParquetTimestampVector.java
new file mode 100644
index 00000000..0625f42a
--- /dev/null
+++
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/reader/ParquetTimestampVector.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.format.parquet.reader;
+
+import org.apache.flink.table.store.data.Timestamp;
+import org.apache.flink.table.store.data.columnar.ColumnVector;
+import org.apache.flink.table.store.data.columnar.LongColumnVector;
+import org.apache.flink.table.store.data.columnar.TimestampColumnVector;
+
+import org.apache.parquet.Preconditions;
+
+/**
+ * Parquet write timestamp precision 0-3 as int64 mills, 4-6 as int64 micros,
7-9 as int96, this
+ * class wrap the real vector to provide {@link TimestampColumnVector}
interface.
+ */
+public class ParquetTimestampVector implements TimestampColumnVector {
+
+ private final ColumnVector vector;
+
+ public ParquetTimestampVector(ColumnVector vector) {
+ this.vector = vector;
+ }
+
+ @Override
+ public Timestamp getTimestamp(int i, int precision) {
+ if (precision <= 3 && vector instanceof LongColumnVector) {
+ return Timestamp.fromEpochMillis(((LongColumnVector)
vector).getLong(i));
+ } else if (precision <= 6 && vector instanceof LongColumnVector) {
+ return Timestamp.fromMicros(((LongColumnVector)
vector).getLong(i));
+ } else {
+ Preconditions.checkArgument(
+ vector instanceof TimestampColumnVector,
+ "Reading timestamp type occur unsupported vector type: %s",
+ vector.getClass());
+ return ((TimestampColumnVector) vector).getTimestamp(i, precision);
+ }
+ }
+
+ public ColumnVector getVector() {
+ return vector;
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNullAt(i);
+ }
+}
diff --git
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/writer/ParquetRowDataWriter.java
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/writer/ParquetRowDataWriter.java
index 5425e9dd..c7dfaa8e 100644
---
a/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/writer/ParquetRowDataWriter.java
+++
b/flink-table-store-format/src/main/java/org/apache/flink/table/store/format/parquet/writer/ParquetRowDataWriter.java
@@ -33,7 +33,6 @@ import org.apache.flink.table.store.types.MapType;
import org.apache.flink.table.store.types.MultisetType;
import org.apache.flink.table.store.types.RowType;
import org.apache.flink.table.store.types.TimestampType;
-import org.apache.flink.table.store.utils.Preconditions;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
@@ -50,6 +49,7 @@ import static
org.apache.flink.table.store.format.parquet.ParquetSchemaConverter
import static
org.apache.flink.table.store.format.parquet.reader.TimestampColumnReader.JULIAN_EPOCH_OFFSET_DAYS;
import static
org.apache.flink.table.store.format.parquet.reader.TimestampColumnReader.MILLIS_IN_DAY;
import static
org.apache.flink.table.store.format.parquet.reader.TimestampColumnReader.NANOS_PER_MILLISECOND;
+import static org.apache.flink.table.store.utils.Preconditions.checkArgument;
/** Writes a record to the Parquet API with the expected schema in order to be
written to a file. */
public class ParquetRowDataWriter {
@@ -104,10 +104,10 @@ public class ParquetRowDataWriter {
return new DoubleWriter();
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) t;
- return new TimestampWriter(timestampType.getPrecision());
+ return createTimestampWriter(timestampType.getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType localZonedTimestampType =
(LocalZonedTimestampType) t;
- return new
TimestampWriter(localZonedTimestampType.getPrecision());
+ return
createTimestampWriter(localZonedTimestampType.getPrecision());
default:
throw new UnsupportedOperationException("Unsupported type:
" + type);
}
@@ -134,6 +134,16 @@ public class ParquetRowDataWriter {
}
}
+ private FieldWriter createTimestampWriter(int precision) {
+ if (precision <= 3) {
+ return new TimestampMillsWriter(precision);
+ } else if (precision > 6) {
+ return new TimestampInt96Writer(precision);
+ } else {
+ return new TimestampMicrosWriter(precision);
+ }
+ }
+
private interface FieldWriter {
void write(InternalRow row, int ordinal);
@@ -294,16 +304,61 @@ public class ParquetRowDataWriter {
}
}
- /**
- * We only support INT96 bytes now, julianDay(4) + nanosOfDay(8). See
- *
https://github.com/apache/parquet-format/blob/master/DataTypes.md#timestamp
TIMESTAMP_MILLIS
- * and TIMESTAMP_MICROS are the deprecated ConvertedType.
- */
- private class TimestampWriter implements FieldWriter {
+ private class TimestampMillsWriter implements FieldWriter {
+
+ private final int precision;
+
+ private TimestampMillsWriter(int precision) {
+ checkArgument(precision <= 3);
+ this.precision = precision;
+ }
+
+ @Override
+ public void write(InternalRow row, int ordinal) {
+ writeTimestamp(row.getTimestamp(ordinal, precision));
+ }
+
+ @Override
+ public void write(InternalArray arrayData, int ordinal) {
+ writeTimestamp(arrayData.getTimestamp(ordinal, precision));
+ }
+
+ private void writeTimestamp(Timestamp value) {
+ recordConsumer.addLong(value.getMillisecond());
+ }
+ }
+
+ private class TimestampMicrosWriter implements FieldWriter {
+
+ private final int precision;
+
+ private TimestampMicrosWriter(int precision) {
+ checkArgument(precision > 3);
+ checkArgument(precision <= 6);
+ this.precision = precision;
+ }
+
+ @Override
+ public void write(InternalRow row, int ordinal) {
+ writeTimestamp(row.getTimestamp(ordinal, precision));
+ }
+
+ @Override
+ public void write(InternalArray arrayData, int ordinal) {
+ writeTimestamp(arrayData.getTimestamp(ordinal, precision));
+ }
+
+ private void writeTimestamp(Timestamp value) {
+ recordConsumer.addLong(value.toMicros());
+ }
+ }
+
+ private class TimestampInt96Writer implements FieldWriter {
private final int precision;
- private TimestampWriter(int precision) {
+ private TimestampInt96Writer(int precision) {
+ checkArgument(precision > 6);
this.precision = precision;
}
@@ -487,26 +542,34 @@ public class ParquetRowDataWriter {
}
private FieldWriter createDecimalWriter(int precision, int scale) {
- Preconditions.checkArgument(
+ checkArgument(
precision <= DecimalType.MAX_PRECISION,
"Decimal precision %s exceeds max precision %s",
precision,
DecimalType.MAX_PRECISION);
- /*
- * This is optimizer for UnscaledBytesWriter.
- */
- class LongUnscaledBytesWriter implements FieldWriter {
- private final int numBytes;
- private final int initShift;
- private final byte[] decimalBuffer;
+ class Int32Writer implements FieldWriter {
- private LongUnscaledBytesWriter() {
- this.numBytes = computeMinBytesForDecimalPrecision(precision);
- this.initShift = 8 * (numBytes - 1);
- this.decimalBuffer = new byte[numBytes];
+ @Override
+ public void write(InternalArray arrayData, int ordinal) {
+ long unscaledLong =
+ (arrayData.getDecimal(ordinal, precision,
scale)).toUnscaledLong();
+ addRecord(unscaledLong);
}
+ @Override
+ public void write(InternalRow row, int ordinal) {
+ long unscaledLong = row.getDecimal(ordinal, precision,
scale).toUnscaledLong();
+ addRecord(unscaledLong);
+ }
+
+ private void addRecord(long unscaledLong) {
+ recordConsumer.addInteger((int) unscaledLong);
+ }
+ }
+
+ class Int64Writer implements FieldWriter {
+
@Override
public void write(InternalArray arrayData, int ordinal) {
long unscaledLong =
@@ -521,15 +584,7 @@ public class ParquetRowDataWriter {
}
private void addRecord(long unscaledLong) {
- int i = 0;
- int shift = initShift;
- while (i < numBytes) {
- decimalBuffer[i] = (byte) (unscaledLong >> shift);
- i += 1;
- shift -= 8;
- }
-
-
recordConsumer.addBinary(Binary.fromReusedByteArray(decimalBuffer, 0,
numBytes));
+ recordConsumer.addLong(unscaledLong);
}
}
@@ -570,14 +625,12 @@ public class ParquetRowDataWriter {
}
}
- // 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY
- // optimizer for UnscaledBytesWriter
- if (ParquetSchemaConverter.is32BitDecimal(precision)
- || ParquetSchemaConverter.is64BitDecimal(precision)) {
- return new LongUnscaledBytesWriter();
+ if (ParquetSchemaConverter.is32BitDecimal(precision)) {
+ return new Int32Writer();
+ } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
+ return new Int64Writer();
+ } else {
+ return new UnscaledBytesWriter();
}
-
- // 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY
- return new UnscaledBytesWriter();
}
}
diff --git
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractorTest.java
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractorTest.java
index 36bf3761..1df1bfa0 100644
---
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractorTest.java
+++
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetFileStatsExtractorTest.java
@@ -18,20 +18,28 @@
package org.apache.flink.table.store.format.parquet;
+import org.apache.flink.table.store.format.FieldStats;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.format.FileStatsExtractorTestBase;
import org.apache.flink.table.store.options.Options;
+import org.apache.flink.table.store.types.ArrayType;
import org.apache.flink.table.store.types.BigIntType;
+import org.apache.flink.table.store.types.BinaryType;
import org.apache.flink.table.store.types.BooleanType;
import org.apache.flink.table.store.types.CharType;
+import org.apache.flink.table.store.types.DataType;
import org.apache.flink.table.store.types.DateType;
import org.apache.flink.table.store.types.DecimalType;
import org.apache.flink.table.store.types.DoubleType;
import org.apache.flink.table.store.types.FloatType;
import org.apache.flink.table.store.types.IntType;
+import org.apache.flink.table.store.types.MapType;
+import org.apache.flink.table.store.types.MultisetType;
import org.apache.flink.table.store.types.RowType;
import org.apache.flink.table.store.types.SmallIntType;
+import org.apache.flink.table.store.types.TimestampType;
import org.apache.flink.table.store.types.TinyIntType;
+import org.apache.flink.table.store.types.VarBinaryType;
import org.apache.flink.table.store.types.VarCharType;
/** Tests for {@link ParquetFileStatsExtractor}. */
@@ -49,6 +57,8 @@ public class ParquetFileStatsExtractorTest extends
FileStatsExtractorTestBase {
new CharType(8),
new VarCharType(8),
new BooleanType(),
+ new BinaryType(8),
+ new VarBinaryType(8),
new TinyIntType(),
new SmallIntType(),
new IntType(),
@@ -56,8 +66,33 @@ public class ParquetFileStatsExtractorTest extends
FileStatsExtractorTestBase {
new FloatType(),
new DoubleType(),
new DecimalType(5, 2),
+ new DecimalType(15, 2),
new DecimalType(38, 18),
- new DateType())
+ new DateType(),
+ new TimestampType(3),
+ new TimestampType(6),
+ new TimestampType(9),
+ new ArrayType(new IntType()),
+ new MapType(new VarCharType(8), new VarCharType(8)),
+ new MultisetType(new VarCharType(8)))
.build();
}
+
+ @Override
+ protected FieldStats regenerate(FieldStats stats, DataType type) {
+ switch (type.getTypeRoot()) {
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ TimestampType timestampType = (TimestampType) type;
+ if (timestampType.getPrecision() > 6) {
+ return new FieldStats(null, null, stats.nullCount());
+ }
+ break;
+ case ARRAY:
+ case MAP:
+ case MULTISET:
+ case ROW:
+ return new FieldStats(null, null, null);
+ }
+ return stats;
+ }
}
diff --git
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetReadWriteTest.java
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetReadWriteTest.java
index 54325335..e56e3a4a 100644
---
a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetReadWriteTest.java
+++
b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/parquet/ParquetReadWriteTest.java
@@ -86,6 +86,8 @@ public class ParquetReadWriteTest {
new BigIntType(),
new FloatType(),
new DoubleType(),
+ new TimestampType(3),
+ new TimestampType(6),
new TimestampType(9),
new DecimalType(5, 0),
new DecimalType(15, 2),
@@ -362,6 +364,8 @@ public class ParquetReadWriteTest {
assertThat(row.isNullAt(30)).isTrue();
assertThat(row.isNullAt(31)).isTrue();
assertThat(row.isNullAt(32)).isTrue();
+ assertThat(row.isNullAt(33)).isTrue();
+ assertThat(row.isNullAt(34)).isTrue();
} else {
assertThat(row.getString(0)).hasToString("" + v);
assertThat(row.getBoolean(1)).isEqualTo(v % 2 == 0);
@@ -371,55 +375,56 @@ public class ParquetReadWriteTest {
assertThat(row.getLong(5)).isEqualTo(v.longValue());
assertThat(row.getFloat(6)).isEqualTo(v.floatValue());
assertThat(row.getDouble(7)).isEqualTo(v.doubleValue());
- assertThat(row.getTimestamp(8, 9).toLocalDateTime())
- .isEqualTo(toDateTime(v));
+ assertThat(row.getTimestamp(8,
3)).isEqualTo(toMills(v));
+ assertThat(row.getTimestamp(9,
6)).isEqualTo(toMicros(v));
+ assertThat(row.getTimestamp(10,
9)).isEqualTo(toNanos(v));
if (Decimal.fromBigDecimal(BigDecimal.valueOf(v), 5,
0) == null) {
- assertThat(row.isNullAt(9)).isTrue();
- assertThat(row.isNullAt(12)).isTrue();
- assertThat(row.isNullAt(24)).isTrue();
- assertThat(row.isNullAt(27)).isTrue();
+ assertThat(row.isNullAt(11)).isTrue();
+ assertThat(row.isNullAt(14)).isTrue();
+ assertThat(row.isNullAt(26)).isTrue();
+ assertThat(row.isNullAt(29)).isTrue();
} else {
- assertThat(row.getDecimal(9, 5, 0))
+ assertThat(row.getDecimal(11, 5, 0))
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(v), 5, 0));
- assertThat(row.getDecimal(12, 5, 0))
+ assertThat(row.getDecimal(14, 5, 0))
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(v), 5, 0));
- assertThat(row.getArray(24).getDecimal(0, 5, 0))
+ assertThat(row.getArray(26).getDecimal(0, 5, 0))
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(v), 5, 0));
- assertThat(row.getArray(27).getDecimal(0, 5, 0))
+ assertThat(row.getArray(29).getDecimal(0, 5, 0))
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(v), 5, 0));
}
- assertThat(row.getDecimal(10, 15, 2))
+ assertThat(row.getDecimal(12, 15, 2))
.isEqualTo(Decimal.fromUnscaledLong(v.longValue(), 15, 2));
- assertThat(row.getDecimal(11, 20, 0).toBigDecimal())
+ assertThat(row.getDecimal(13, 20, 0).toBigDecimal())
.isEqualTo(BigDecimal.valueOf(v));
- assertThat(row.getDecimal(13, 15, 0).toBigDecimal())
+ assertThat(row.getDecimal(15, 15, 0).toBigDecimal())
.isEqualTo(BigDecimal.valueOf(v));
- assertThat(row.getDecimal(14, 20, 0).toBigDecimal())
+ assertThat(row.getDecimal(16, 20, 0).toBigDecimal())
.isEqualTo(BigDecimal.valueOf(v));
-
assertThat(row.getArray(15).getString(0)).hasToString("" + v);
- assertThat(row.getArray(16).getBoolean(0)).isEqualTo(v
% 2 == 0);
-
assertThat(row.getArray(17).getByte(0)).isEqualTo(v.byteValue());
-
assertThat(row.getArray(18).getShort(0)).isEqualTo(v.shortValue());
-
assertThat(row.getArray(19).getInt(0)).isEqualTo(v.intValue());
-
assertThat(row.getArray(20).getLong(0)).isEqualTo(v.longValue());
-
assertThat(row.getArray(21).getFloat(0)).isEqualTo(v.floatValue());
-
assertThat(row.getArray(22).getDouble(0)).isEqualTo(v.doubleValue());
- assertThat(row.getArray(23).getTimestamp(0,
9).toLocalDateTime())
+
assertThat(row.getArray(17).getString(0)).hasToString("" + v);
+ assertThat(row.getArray(18).getBoolean(0)).isEqualTo(v
% 2 == 0);
+
assertThat(row.getArray(19).getByte(0)).isEqualTo(v.byteValue());
+
assertThat(row.getArray(20).getShort(0)).isEqualTo(v.shortValue());
+
assertThat(row.getArray(21).getInt(0)).isEqualTo(v.intValue());
+
assertThat(row.getArray(22).getLong(0)).isEqualTo(v.longValue());
+
assertThat(row.getArray(23).getFloat(0)).isEqualTo(v.floatValue());
+
assertThat(row.getArray(24).getDouble(0)).isEqualTo(v.doubleValue());
+ assertThat(row.getArray(25).getTimestamp(0,
9).toLocalDateTime())
.isEqualTo(toDateTime(v));
- assertThat(row.getArray(25).getDecimal(0, 15, 0))
+ assertThat(row.getArray(27).getDecimal(0, 15, 0))
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(v), 15, 0));
- assertThat(row.getArray(26).getDecimal(0, 20, 0))
+ assertThat(row.getArray(28).getDecimal(0, 20, 0))
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(v), 20, 0));
- assertThat(row.getArray(28).getDecimal(0, 15, 0))
+ assertThat(row.getArray(30).getDecimal(0, 15, 0))
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(v), 15, 0));
- assertThat(row.getArray(29).getDecimal(0, 20, 0))
+ assertThat(row.getArray(31).getDecimal(0, 20, 0))
.isEqualTo(Decimal.fromBigDecimal(BigDecimal.valueOf(v), 20, 0));
-
assertThat(row.getMap(30).valueArray().getString(0)).hasToString("" + v);
-
assertThat(row.getMap(31).valueArray().getBoolean(0)).isEqualTo(v % 2 == 0);
-
assertThat(row.getMap(32).keyArray().getString(0)).hasToString("" + v);
- assertThat(row.getRow(33,
2).getString(0)).hasToString("" + v);
- assertThat(row.getRow(33,
2).getInt(1)).isEqualTo(v.intValue());
+
assertThat(row.getMap(32).valueArray().getString(0)).hasToString("" + v);
+
assertThat(row.getMap(33).valueArray().getBoolean(0)).isEqualTo(v % 2 == 0);
+
assertThat(row.getMap(34).keyArray().getString(0)).hasToString("" + v);
+ assertThat(row.getRow(35,
2).getString(0)).hasToString("" + v);
+ assertThat(row.getRow(35,
2).getInt(1)).isEqualTo(v.intValue());
}
cnt.incrementAndGet();
});
@@ -450,7 +455,9 @@ public class ParquetReadWriteTest {
v.longValue(),
v.floatValue(),
v.doubleValue(),
- Timestamp.fromLocalDateTime(toDateTime(v)),
+ toMills(v),
+ toMicros(v),
+ toNanos(v),
Decimal.fromBigDecimal(BigDecimal.valueOf(v), 5, 0),
Decimal.fromUnscaledLong(v.longValue(), 15, 2),
Decimal.fromBigDecimal(BigDecimal.valueOf(v), 20, 0),
@@ -465,7 +472,7 @@ public class ParquetReadWriteTest {
new GenericArray(new Object[] {v.longValue(), null}),
new GenericArray(new Object[] {v.floatValue(), null}),
new GenericArray(new Object[] {v.doubleValue(), null}),
- new GenericArray(new Object[]
{Timestamp.fromLocalDateTime(toDateTime(v)), null}),
+ new GenericArray(new Object[] {toNanos(v), null}),
Decimal.fromBigDecimal(BigDecimal.valueOf(v), 5, 0) == null
? null
: new GenericArray(
@@ -492,6 +499,19 @@ public class ParquetReadWriteTest {
GenericRow.of(BinaryString.fromString("" + v), v));
}
+ private Timestamp toMills(Integer v) {
+ return Timestamp.fromEpochMillis(
+ Timestamp.fromLocalDateTime(toDateTime(v)).getMillisecond());
+ }
+
+ private Timestamp toMicros(Integer v) {
+ return
Timestamp.fromMicros(Timestamp.fromLocalDateTime(toDateTime(v)).toMicros());
+ }
+
+ private Timestamp toNanos(Integer v) {
+ return Timestamp.fromLocalDateTime(toDateTime(v));
+ }
+
private LocalDateTime toDateTime(Integer v) {
v = (v > 0 ? v : -v) % 10000;
return BASE_TIME.plusNanos(v).plusSeconds(v);