This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new aa7ff79 [FLINK-26346] Add statistics collecting to sst files
aa7ff79 is described below
commit aa7ff797e367df5f63ff8e8b38b6fe0e6f4c9206
Author: tsreaper <[email protected]>
AuthorDate: Wed Mar 9 11:05:42 2022 +0800
[FLINK-26346] Add statistics collecting to sst files
This closes #33
---
.../table/store/connector/sink/TestFileStore.java | 3 +
.../source/FileStoreSourceSplitGeneratorTest.java | 1 +
.../source/FileStoreSourceSplitSerializerTest.java | 1 +
.../store/connector/source/TestDataReadWrite.java | 2 +-
.../flink/table/store/file/FileStoreOptions.java | 1 +
.../table/store/file/{ => format}/FileFormat.java | 26 ++-
.../table/store/file/format/FileFormatFactory.java | 29 +++
.../store/file/{ => format}/FileFormatImpl.java | 2 +-
.../table/store/file/manifest/ManifestFile.java | 2 +-
.../table/store/file/manifest/ManifestList.java | 2 +-
.../store/file/mergetree/sst/SstFileMeta.java | 36 ++--
.../file/mergetree/sst/SstFileMetaSerializer.java | 17 +-
.../store/file/mergetree/sst/SstFileReader.java | 2 +-
.../store/file/mergetree/sst/SstFileWriter.java | 86 +++++++--
.../store/file/operation/FileStoreReadImpl.java | 2 +-
.../store/file/operation/FileStoreWriteImpl.java | 2 +-
.../store/file/stats/FieldStatsCollector.java | 11 +-
.../table/store/file/stats/FileStatsExtractor.java | 29 +++
.../flink/table/store/file/FileFormatTest.java | 2 +
.../FileStatsExtractingAvroFormat.java} | 40 ++--
.../FileStatsExtractingAvroFormatFactory.java | 35 ++++
.../FlushingFileFormat.java} | 23 ++-
.../ManifestCommittableSerializerTest.java | 3 +-
.../store/file/manifest/ManifestFileMetaTest.java | 3 +-
.../store/file/manifest/ManifestFileTest.java | 11 +-
.../store/file/manifest/ManifestListTest.java | 2 +-
.../table/store/file/mergetree/LevelsTest.java | 2 +-
.../table/store/file/mergetree/MergeTreeTest.java | 6 +-
.../file/mergetree/compact/CompactManagerTest.java | 11 +-
.../mergetree/compact/IntervalPartitionTest.java | 1 +
.../mergetree/compact/UniversalCompactionTest.java | 2 +-
.../store/file/mergetree/sst/SstFileTest.java | 60 +++---
.../file/mergetree/sst/SstTestDataGenerator.java | 11 +-
.../file/stats/FileStatsExtractorTestBase.java | 209 +++++++++++++++++++++
.../table/store/file/stats/StatsTestUtils.java | 9 +-
.../store/file/stats/TestFileStatsExtractor.java | 75 ++++++++
...flink.table.store.file.format.FileFormatFactory | 16 ++
flink-table-store-orc/pom.xml | 116 ++++++++++++
.../flink/table/store/orc/OrcFileFormat.java | 51 ++---
.../table/store/orc/OrcFileFormatFactory.java | 37 ++++
.../table/store/orc/OrcFileStatsExtractor.java | 191 +++++++++++++++++++
...flink.table.store.file.format.FileFormatFactory | 16 ++
.../table/store/orc/OrcFileStatsExtractorTest.java | 68 +++++++
pom.xml | 2 +
44 files changed, 1096 insertions(+), 160 deletions(-)
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index db7a3de..6b38312 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -185,6 +185,9 @@ public class TestFileStore implements FileStore {
null,
null,
new FieldStats[] {
+ new FieldStats(null,
null, 0)
+ },
+ new FieldStats[] {
new FieldStats(null,
null, 0),
new FieldStats(null,
null, 0),
new FieldStats(null,
null, 0)
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 355ddea..2df9a9f 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -111,6 +111,7 @@ public class FileStoreSourceSplitGeneratorTest {
null, // not used
null, // not used
new FieldStats[] {new FieldStats(null, null, 0)}, //
not used
+ new FieldStats[] {new FieldStats(null, null, 0)}, //
not used
0, // not used
0, // not used
0 // not used
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
index 3772b90..0166f8f 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
@@ -81,6 +81,7 @@ public class FileStoreSourceSplitSerializerTest {
row(0),
row(0),
new FieldStats[] {new FieldStats(null, null, 0)},
+ new FieldStats[] {new FieldStats(null, null, 0)},
0,
1,
level);
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
index aba6f5f..0f74f19 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestDataReadWrite.java
@@ -24,8 +24,8 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index 0233e2d..bf1b0e1 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import java.io.Serializable;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormat.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java
similarity index 83%
rename from
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormat.java
rename to
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java
index 4309284..a403ac2 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormat.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormat.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file;
+package org.apache.flink.table.store.file.format;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.ConfigOption;
@@ -28,10 +28,13 @@ import
org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
import org.apache.flink.table.types.logical.RowType;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
+import java.util.ServiceLoader;
/** Factory class which creates reader and writer factories for specific file
format. */
public interface FileFormat {
@@ -62,14 +65,12 @@ public interface FileFormat {
return createReaderFactory(rowType, projection, new ArrayList<>());
}
- /** Create a {@link FileFormatImpl} from format identifier and format
options. */
- static FileFormatImpl fromIdentifier(
- ClassLoader classLoader, String formatIdentifier, ReadableConfig
formatOptions) {
- return new FileFormatImpl(classLoader, formatIdentifier,
formatOptions);
+ default Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
+ return Optional.empty();
}
/** Create a {@link FileFormatImpl} from table options. */
- static FileFormatImpl fromTableOptions(
+ static FileFormat fromTableOptions(
ClassLoader classLoader,
Configuration tableOptions,
ConfigOption<String> formatOption) {
@@ -78,4 +79,17 @@ public interface FileFormat {
ReadableConfig formatOptions = new
DelegatingConfiguration(tableOptions, formatPrefix);
return fromIdentifier(classLoader, formatIdentifier, formatOptions);
}
+
+ /** Create a {@link FileFormatImpl} from format identifier and format
options. */
+ static FileFormat fromIdentifier(
+ ClassLoader classLoader, String formatIdentifier, ReadableConfig
formatOptions) {
+ ServiceLoader<FileFormatFactory> serviceLoader =
+ ServiceLoader.load(FileFormatFactory.class);
+ for (FileFormatFactory factory : serviceLoader) {
+ if (factory.identifier().equals(formatIdentifier.toLowerCase())) {
+ return factory.create(classLoader, formatOptions);
+ }
+ }
+ return new FileFormatImpl(classLoader, formatIdentifier,
formatOptions);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
new file mode 100644
index 0000000..58204df
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.format;
+
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Factory to create {@link FileFormat}. */
+public interface FileFormatFactory {
+
+ String identifier();
+
+ FileFormat create(ClassLoader classLoader, ReadableConfig formatOptions);
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormatImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java
similarity index 99%
rename from
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormatImpl.java
rename to
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java
index cdcdd84..1846e04 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileFormatImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/format/FileFormatImpl.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file;
+package org.apache.flink.table.store.file.format;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index cbc1571..49f69fe 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -25,7 +25,7 @@ import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.stats.FieldStatsCollector;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
index bde36a9..e15f546 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
@@ -25,7 +25,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.types.logical.RowType;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
index da419e8..3a8db5e 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
@@ -42,7 +42,8 @@ public class SstFileMeta {
private final BinaryRowData minKey;
private final BinaryRowData maxKey;
- private final FieldStats[] stats;
+ private final FieldStats[] keyStats;
+ private final FieldStats[] valueStats;
private final long minSequenceNumber;
private final long maxSequenceNumber;
@@ -54,7 +55,8 @@ public class SstFileMeta {
long rowCount,
BinaryRowData minKey,
BinaryRowData maxKey,
- FieldStats[] stats,
+ FieldStats[] keyStats,
+ FieldStats[] valueStats,
long minSequenceNumber,
long maxSequenceNumber,
int level) {
@@ -64,7 +66,8 @@ public class SstFileMeta {
this.minKey = minKey;
this.maxKey = maxKey;
- this.stats = stats;
+ this.keyStats = keyStats;
+ this.valueStats = valueStats;
this.minSequenceNumber = minSequenceNumber;
this.maxSequenceNumber = maxSequenceNumber;
@@ -91,9 +94,12 @@ public class SstFileMeta {
return maxKey;
}
- /** Element in the array may be null, indicating the statistics of this
field is unknown. */
- public FieldStats[] stats() {
- return stats;
+ public FieldStats[] keyStats() {
+ return keyStats;
+ }
+
+ public FieldStats[] valueStats() {
+ return valueStats;
}
public long minSequenceNumber() {
@@ -116,7 +122,8 @@ public class SstFileMeta {
rowCount,
minKey,
maxKey,
- stats,
+ keyStats,
+ valueStats,
minSequenceNumber,
maxSequenceNumber,
newLevel);
@@ -133,7 +140,8 @@ public class SstFileMeta {
&& rowCount == that.rowCount
&& Objects.equals(minKey, that.minKey)
&& Objects.equals(maxKey, that.maxKey)
- && Arrays.equals(stats, that.stats)
+ && Arrays.equals(keyStats, that.keyStats)
+ && Arrays.equals(valueStats, that.valueStats)
&& minSequenceNumber == that.minSequenceNumber
&& maxSequenceNumber == that.maxSequenceNumber
&& level == that.level;
@@ -149,7 +157,8 @@ public class SstFileMeta {
maxKey,
// by default, hash code of arrays are computed by reference,
not by content.
// so we must use Arrays.hashCode to hash by content.
- Arrays.hashCode(stats),
+ Arrays.hashCode(keyStats),
+ Arrays.hashCode(valueStats),
minSequenceNumber,
maxSequenceNumber,
level);
@@ -158,13 +167,14 @@ public class SstFileMeta {
@Override
public String toString() {
return String.format(
- "{%s, %d, %d, %s, %s, %s, %d, %d, %d}",
+ "{%s, %d, %d, %s, %s, %s, %s, %d, %d, %d}",
fileName,
fileSize,
rowCount,
minKey,
maxKey,
- Arrays.toString(stats),
+ Arrays.toString(keyStats),
+ Arrays.toString(valueStats),
minSequenceNumber,
maxSequenceNumber,
level);
@@ -177,7 +187,9 @@ public class SstFileMeta {
fields.add(new RowType.RowField("_ROW_COUNT", new BigIntType(false)));
fields.add(new RowType.RowField("_MIN_KEY", keyType));
fields.add(new RowType.RowField("_MAX_KEY", keyType));
- fields.add(new RowType.RowField("_STATS",
FieldStatsArraySerializer.schema(valueType)));
+ fields.add(new RowType.RowField("_KEY_STATS",
FieldStatsArraySerializer.schema(keyType)));
+ fields.add(
+ new RowType.RowField("_VALUE_STATS",
FieldStatsArraySerializer.schema(valueType)));
fields.add(new RowType.RowField("_MIN_SEQUENCE_NUMBER", new
BigIntType(false)));
fields.add(new RowType.RowField("_MAX_SEQUENCE_NUMBER", new
BigIntType(false)));
fields.add(new RowType.RowField("_LEVEL", new IntType(false)));
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
index d03a71e..9e4dbe6 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
@@ -32,12 +32,14 @@ public class SstFileMetaSerializer extends
ObjectSerializer<SstFileMeta> {
private static final long serialVersionUID = 1L;
private final RowDataSerializer keySerializer;
- private final FieldStatsArraySerializer statsArraySerializer;
+ private final FieldStatsArraySerializer keyStatsArraySerializer;
+ private final FieldStatsArraySerializer valueStatsArraySerializer;
public SstFileMetaSerializer(RowType keyType, RowType valueType) {
super(SstFileMeta.schema(keyType, valueType));
this.keySerializer = new RowDataSerializer(keyType);
- this.statsArraySerializer = new FieldStatsArraySerializer(valueType);
+ this.keyStatsArraySerializer = new FieldStatsArraySerializer(keyType);
+ this.valueStatsArraySerializer = new
FieldStatsArraySerializer(valueType);
}
@Override
@@ -48,7 +50,8 @@ public class SstFileMetaSerializer extends
ObjectSerializer<SstFileMeta> {
meta.rowCount(),
meta.minKey(),
meta.maxKey(),
- statsArraySerializer.toRow(meta.stats()),
+ keyStatsArraySerializer.toRow(meta.keyStats()),
+ valueStatsArraySerializer.toRow(meta.valueStats()),
meta.minSequenceNumber(),
meta.maxSequenceNumber(),
meta.level());
@@ -63,9 +66,11 @@ public class SstFileMetaSerializer extends
ObjectSerializer<SstFileMeta> {
row.getLong(2),
keySerializer.toBinaryRow(row.getRow(3, keyFieldCount)).copy(),
keySerializer.toBinaryRow(row.getRow(4, keyFieldCount)).copy(),
- statsArraySerializer.fromRow(row.getRow(5,
statsArraySerializer.numFields())),
- row.getLong(6),
+ keyStatsArraySerializer.fromRow(row.getRow(5,
keyStatsArraySerializer.numFields())),
+ valueStatsArraySerializer.fromRow(
+ row.getRow(6, valueStatsArraySerializer.numFields())),
row.getLong(7),
- row.getInt(8));
+ row.getLong(8),
+ row.getInt(9));
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
index f9ce7f3..b3c0a59 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileReader.java
@@ -25,9 +25,9 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializer;
+import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.RecordReader;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
index 5c18aa6..afc7e19 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileWriter.java
@@ -25,10 +25,12 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
-import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializer;
+import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.RollingFile;
@@ -40,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/** Writes {@link KeyValue}s into sst files. */
@@ -50,6 +53,7 @@ public class SstFileWriter {
private final RowType keyType;
private final RowType valueType;
private final BulkWriter.Factory<RowData> writerFactory;
+ private final FileStatsExtractor fileStatsExtractor;
private final SstPathFactory pathFactory;
private final long suggestedFileSize;
@@ -57,11 +61,13 @@ public class SstFileWriter {
RowType keyType,
RowType valueType,
BulkWriter.Factory<RowData> writerFactory,
+ FileStatsExtractor fileStatsExtractor,
SstPathFactory pathFactory,
long suggestedFileSize) {
this.keyType = keyType;
this.valueType = valueType;
this.writerFactory = writerFactory;
+ this.fileStatsExtractor = fileStatsExtractor;
this.pathFactory = pathFactory;
this.suggestedFileSize = suggestedFileSize;
}
@@ -91,7 +97,10 @@ public class SstFileWriter {
*/
public List<SstFileMeta> write(CloseableIterator<KeyValue> iterator, int
level)
throws Exception {
- SstRollingFile rollingFile = new StatsCollectingRollingFile(level);
+ SstRollingFile rollingFile =
+ fileStatsExtractor == null
+ ? new StatsCollectingRollingFile(level)
+ : new FileExtractingRollingFile(level);
List<SstFileMeta> result = new ArrayList<>();
List<Path> filesToCleanUp = new ArrayList<>();
try {
@@ -161,6 +170,7 @@ public class SstFileWriter {
@Override
protected SstFileMeta collectFile(Path path) throws IOException {
+ KeyAndValueStats stats = extractStats(path);
SstFileMeta result =
new SstFileMeta(
path.getName(),
@@ -168,7 +178,8 @@ public class SstFileWriter {
rowCount,
minKey,
keySerializer.toBinaryRow(maxKey).copy(),
- collectStats(path),
+ stats.keyStats,
+ stats.valueStats,
minSequenceNumber,
maxSequenceNumber,
level);
@@ -176,7 +187,7 @@ public class SstFileWriter {
return result;
}
- private void resetMeta() {
+ protected void resetMeta() {
rowCount = 0;
minKey = null;
maxKey = null;
@@ -184,26 +195,68 @@ public class SstFileWriter {
maxSequenceNumber = Long.MIN_VALUE;
}
- protected abstract FieldStats[] collectStats(Path path);
+ protected abstract KeyAndValueStats extractStats(Path path);
+ }
+
+ private class FileExtractingRollingFile extends SstRollingFile {
+
+ private FileExtractingRollingFile(int level) {
+ super(level);
+ }
+
+ @Override
+ protected KeyAndValueStats extractStats(Path path) {
+ FieldStats[] rawStats;
+ try {
+ rawStats = fileStatsExtractor.extract(path);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ int numKeyFields = keyType.getFieldCount();
+ return new KeyAndValueStats(
+ Arrays.copyOfRange(rawStats, 0, numKeyFields),
+ Arrays.copyOfRange(rawStats, numKeyFields + 2,
rawStats.length));
+ }
}
private class StatsCollectingRollingFile extends SstRollingFile {
+ private FieldStatsCollector keyStatsCollector;
+ private FieldStatsCollector valueStatsCollector;
+
private StatsCollectingRollingFile(int level) {
super(level);
}
@Override
- protected FieldStats[] collectStats(Path path) {
- // TODO
- // 1. Read statistics directly from the written orc/parquet files.
- // 2. For other file formats use StatsCollector. Make sure fields
are not reused
- // otherwise we need copying.
- FieldStats[] stats = new FieldStats[valueType.getFieldCount()];
- for (int i = 0; i < stats.length; i++) {
- stats[i] = new FieldStats(null, null, 0);
- }
- return stats;
+ protected RowData toRowData(KeyValue kv) {
+ keyStatsCollector.collect(kv.key());
+ valueStatsCollector.collect(kv.value());
+ return super.toRowData(kv);
+ }
+
+ @Override
+ protected KeyAndValueStats extractStats(Path path) {
+ return new KeyAndValueStats(keyStatsCollector.extract(),
valueStatsCollector.extract());
+ }
+
+ @Override
+ protected void resetMeta() {
+ super.resetMeta();
+ keyStatsCollector = new FieldStatsCollector(keyType);
+ valueStatsCollector = new FieldStatsCollector(valueType);
+ }
+ }
+
+ private static class KeyAndValueStats {
+
+ private final FieldStats[] keyStats;
+ private final FieldStats[] valueStats;
+
+ private KeyAndValueStats(FieldStats[] keyStats, FieldStats[]
valueStats) {
+ this.keyStats = keyStats;
+ this.valueStats = valueStats;
}
}
@@ -213,6 +266,7 @@ public class SstFileWriter {
private final RowType keyType;
private final RowType valueType;
private final BulkWriter.Factory<RowData> writerFactory;
+ private final FileStatsExtractor fileStatsExtractor;
private final FileStorePathFactory pathFactory;
private final long suggestedFileSize;
@@ -226,6 +280,7 @@ public class SstFileWriter {
this.valueType = valueType;
RowType recordType = KeyValue.schema(keyType, valueType);
this.writerFactory = fileFormat.createWriterFactory(recordType);
+ this.fileStatsExtractor =
fileFormat.createStatsExtractor(recordType).orElse(null);
this.pathFactory = pathFactory;
this.suggestedFileSize = suggestedFileSize;
}
@@ -235,6 +290,7 @@ public class SstFileWriter {
keyType,
valueType,
writerFactory,
+ fileStatsExtractor,
pathFactory.createSstPathFactory(partition, bucket),
suggestedFileSize);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
index 0df5aa7..2d69773 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
index 7bee391..441068a 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.mergetree.Levels;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
index 89ed0f9..8acb126 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
@@ -18,7 +18,9 @@
package org.apache.flink.table.store.file.stats;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
import org.apache.flink.table.types.logical.RowType;
@@ -29,6 +31,7 @@ public class FieldStatsCollector {
private final Object[] maxValues;
private final long[] nullCounts;
private final RowDataToObjectArrayConverter converter;
+ private final TypeSerializer<Object>[] fieldSerializers;
public FieldStatsCollector(RowType rowType) {
int numFields = rowType.getFieldCount();
@@ -36,6 +39,10 @@ public class FieldStatsCollector {
this.maxValues = new Object[numFields];
this.nullCounts = new long[numFields];
this.converter = new RowDataToObjectArrayConverter(rowType);
+ this.fieldSerializers = new TypeSerializer[numFields];
+ for (int i = 0; i < numFields; i++) {
+ fieldSerializers[i] =
InternalSerializers.create(rowType.getTypeAt(i));
+ }
}
/**
@@ -59,10 +66,10 @@ public class FieldStatsCollector {
}
Comparable<Object> c = (Comparable<Object>) obj;
if (minValues[i] == null || c.compareTo(minValues[i]) < 0) {
- minValues[i] = c;
+ minValues[i] = fieldSerializers[i].copy(c);
}
if (maxValues[i] == null || c.compareTo(maxValues[i]) > 0) {
- maxValues[i] = c;
+ maxValues[i] = fieldSerializers[i].copy(c);
}
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FileStatsExtractor.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FileStatsExtractor.java
new file mode 100644
index 0000000..3f56d3b
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FileStatsExtractor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+
+/** Extracts statistics directly from file. */
+public interface FileStatsExtractor {
+
+ FieldStats[] extract(Path path) throws IOException;
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
index 32eb230..9102e6f 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/FileFormatTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.format.FileFormatImpl;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
similarity index 59%
copy from
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
copy to
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
index a6ff06b..77e9807 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormat.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.utils;
+package org.apache.flink.table.store.file.format;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.Configuration;
@@ -24,19 +24,21 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.store.file.FileFormat;
-import org.apache.flink.table.store.file.mergetree.sst.SstFileTest;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.store.file.stats.TestFileStatsExtractor;
import org.apache.flink.table.types.logical.RowType;
-import java.io.IOException;
import java.util.List;
+import java.util.Optional;
-/** A special avro {@link FileFormat} which flushes for every added element. */
-public class FlushingAvroFormat implements FileFormat {
+/** An avro {@link FileFormat} for test. It provides a {@link
FileStatsExtractor}. */
+public class FileStatsExtractingAvroFormat implements FileFormat {
private final FileFormat avro =
FileFormat.fromIdentifier(
- SstFileTest.class.getClassLoader(), "avro", new
Configuration());
+ FileStatsExtractingAvroFormat.class.getClassLoader(),
+ "avro",
+ new Configuration());
@Override
public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
@@ -46,25 +48,11 @@ public class FlushingAvroFormat implements FileFormat {
@Override
public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
- return fsDataOutputStream -> {
- BulkWriter<RowData> wrapped =
avro.createWriterFactory(type).create(fsDataOutputStream);
- return new BulkWriter<RowData>() {
- @Override
- public void addElement(RowData rowData) throws IOException {
- wrapped.addElement(rowData);
- wrapped.flush();
- }
-
- @Override
- public void flush() throws IOException {
- wrapped.flush();
- }
+ return avro.createWriterFactory(type);
+ }
- @Override
- public void finish() throws IOException {
- wrapped.finish();
- }
- };
- };
+ @Override
+ public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
+ return Optional.of(new TestFileStatsExtractor(avro, type));
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
new file mode 100644
index 0000000..1b8fdef
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileStatsExtractingAvroFormatFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.format;
+
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Factory to create {@link FileStatsExtractingAvroFormat}. */
+public class FileStatsExtractingAvroFormatFactory implements FileFormatFactory
{
+
+ @Override
+ public String identifier() {
+ return "avro-extract";
+ }
+
+ @Override
+ public FileFormat create(ClassLoader classLoader, ReadableConfig
formatOptions) {
+ return new FileStatsExtractingAvroFormat();
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
similarity index 76%
copy from
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
copy to
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
index a6ff06b..fb8f90c 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FlushingFileFormat.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.utils;
+package org.apache.flink.table.store.file.format;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.Configuration;
@@ -24,30 +24,33 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.store.file.FileFormat;
-import org.apache.flink.table.store.file.mergetree.sst.SstFileTest;
import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
import java.util.List;
-/** A special avro {@link FileFormat} which flushes for every added element. */
-public class FlushingAvroFormat implements FileFormat {
+/** A special {@link FileFormat} which flushes for every added element. */
+public class FlushingFileFormat implements FileFormat {
- private final FileFormat avro =
- FileFormat.fromIdentifier(
- SstFileTest.class.getClassLoader(), "avro", new
Configuration());
+ private final FileFormat format;
+
+ public FlushingFileFormat(String identifier) {
+ this.format =
+ FileFormat.fromIdentifier(
+ FlushingFileFormat.class.getClassLoader(), identifier,
new Configuration());
+ }
@Override
public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
RowType type, int[][] projection, List<ResolvedExpression>
filters) {
- return avro.createReaderFactory(type, projection, filters);
+ return format.createReaderFactory(type, projection, filters);
}
@Override
public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
return fsDataOutputStream -> {
- BulkWriter<RowData> wrapped =
avro.createWriterFactory(type).create(fsDataOutputStream);
+ BulkWriter<RowData> wrapped =
+
format.createWriterFactory(type).create(fsDataOutputStream);
return new BulkWriter<RowData>() {
@Override
public void addElement(RowData rowData) throws IOException {
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
index 10b06c3..8f8bd8c 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
@@ -90,6 +90,7 @@ public class ManifestCommittableSerializerTest {
public static SstFileMeta newFile(int name, int level) {
FieldStats[] stats = new FieldStats[] {new FieldStats(0, 1, 0)};
- return new SstFileMeta(String.valueOf(name), 0, 1, row(0), row(0),
stats, 0, 1, level);
+ return new SstFileMeta(
+ String.valueOf(name), 0, 1, row(0), row(0), stats, stats, 0,
1, level);
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index 9296a31..51084d0 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
-import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
import org.apache.flink.table.store.file.stats.FieldStats;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
@@ -240,6 +240,7 @@ public class ManifestFileMetaTest {
binaryRowData, // not used
binaryRowData, // not used
new FieldStats[] {new FieldStats(null, null, 0)}, //
not used
+ new FieldStats[] {new FieldStats(null, null, 0)}, //
not used
0, // not used
0, // not used
0 // not used
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index f1c8c78..5087125 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -21,9 +21,8 @@ package org.apache.flink.table.store.file.manifest;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -124,11 +123,9 @@ public class ManifestFileTest {
// check stats
for (int i = 0; i < expected.partitionStats().length; i++) {
- List<FieldStats> actualStats = new ArrayList<>();
- for (ManifestFileMeta meta : actual) {
- actualStats.add(meta.partitionStats()[i]);
- }
- StatsTestUtils.checkRollingFileStats(expected.partitionStats()[i],
actualStats);
+ int idx = i;
+ StatsTestUtils.checkRollingFileStats(
+ expected.partitionStats()[i], actual, meta ->
meta.partitionStats()[idx]);
}
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
index 39f2529..a011454 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.table.store.file.manifest;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
index 3f6a89b..d2a2059 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
@@ -62,6 +62,6 @@ public class LevelsTest {
}
public static SstFileMeta newFile(int level) {
- return new SstFileMeta("", 0, 1, row(0), row(0), null, 0, 1, level);
+ return new SstFileMeta("", 0, 1, row(0), row(0), null, null, 0, 1,
level);
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index c6b86b9..18b84cc 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -25,9 +25,10 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowDataUtil;
-import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.format.FlushingFileFormat;
import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
@@ -37,7 +38,6 @@ import
org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
import org.apache.flink.table.store.file.mergetree.sst.SstFileReader;
import org.apache.flink.table.store.file.mergetree.sst.SstFileWriter;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.FlushingAvroFormat;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.RecordWriter;
@@ -99,7 +99,7 @@ public class MergeTreeTest {
options = new MergeTreeOptions(configuration);
RowType keyType = new RowType(singletonList(new RowType.RowField("k",
new IntType())));
RowType valueType = new RowType(singletonList(new
RowType.RowField("v", new IntType())));
- FileFormat flushingAvro = new FlushingAvroFormat();
+ FileFormat flushingAvro = new FlushingFileFormat("avro");
sstFileReader =
new SstFileReader.Factory(keyType, valueType, flushingAvro,
pathFactory)
.create(BinaryRowDataUtil.EMPTY_ROW, 0);
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
index 49c3ca5..98687e4 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
@@ -205,7 +205,16 @@ public class CompactManagerTest {
private static SstFileMeta newFile(int level, int minKey, int maxKey, long
maxSequence) {
return new SstFileMeta(
- "", maxKey - minKey + 1, 1, row(minKey), row(maxKey), null, 0,
maxSequence, level);
+ "",
+ maxKey - minKey + 1,
+ 1,
+ row(minKey),
+ row(maxKey),
+ null,
+ null,
+ 0,
+ maxSequence,
+ level);
}
public static BinaryRowData row(int i) {
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
index f59c1c2..db56649 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/IntervalPartitionTest.java
@@ -172,6 +172,7 @@ public class IntervalPartitionTest {
minKey,
maxKey,
new FieldStats[] {new FieldStats(left, right, 0)},
+ new FieldStats[] {new FieldStats(null, null, 0)}, // not used
0,
24,
0);
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
index 17ae39a..c7678d7 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
@@ -223,6 +223,6 @@ public class UniversalCompactionTest {
}
private SstFileMeta file(long size) {
- return new SstFileMeta("", size, 1, null, null, null, 0, 0, 0);
+ return new SstFileMeta("", size, 1, null, null, null, null, 0, 0, 0);
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
index 00f44e7..3e5acd6 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
@@ -24,13 +24,13 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.binary.BinaryRowDataUtil;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
-import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializerTest;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.format.FlushingFileFormat;
+import org.apache.flink.table.store.file.stats.StatsTestUtils;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.FlushingAvroFormat;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
@@ -56,14 +56,22 @@ public class SstFileTest {
private final SstTestDataGenerator gen =
SstTestDataGenerator.builder().memTableCapacity(20).build();
- private final FileFormat flushingAvro = new FlushingAvroFormat();
@TempDir java.nio.file.Path tempDir;
@RepeatedTest(10)
- public void testWriteAndReadSstFile() throws Exception {
+ public void testWriteAndReadSstFileWithStatsCollectingRollingFile() throws
Exception {
+ testWriteAndReadSstFileImpl("avro");
+ }
+
+ @RepeatedTest(10)
+ public void testWriteAndReadSstFileWithFileExtractingRollingFile() throws
Exception {
+ testWriteAndReadSstFileImpl("avro-extract");
+ }
+
+ private void testWriteAndReadSstFileImpl(String format) throws Exception {
SstTestDataGenerator.Data data = gen.next();
- SstFileWriter writer = createSstFileWriter(tempDir.toString());
+ SstFileWriter writer = createSstFileWriter(tempDir.toString(), format);
SstFileMetaSerializer serializer =
new SstFileMetaSerializer(
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.ROW_TYPE);
@@ -73,7 +81,7 @@ public class SstFileTest {
checkRollingFiles(data.meta, actualMetas, writer.suggestedFileSize());
- SstFileReader reader = createSstFileReader(tempDir.toString(), null,
null);
+ SstFileReader reader = createSstFileReader(tempDir.toString(), format,
null, null);
assertData(
data,
actualMetas,
@@ -90,7 +98,7 @@ public class SstFileTest {
SstTestDataGenerator.Data data = gen.next();
SstFileWriter writer =
createSstFileWriter(
-
FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()));
+
FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString()), "avro");
try {
writer.write(CloseableIterator.fromList(data.content, kv -> {}),
0);
@@ -109,7 +117,7 @@ public class SstFileTest {
@Test
public void testKeyProjection() throws Exception {
SstTestDataGenerator.Data data = gen.next();
- SstFileWriter sstFileWriter = createSstFileWriter(tempDir.toString());
+ SstFileWriter sstFileWriter = createSstFileWriter(tempDir.toString(),
"avro");
SstFileMetaSerializer serializer =
new SstFileMetaSerializer(
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.ROW_TYPE);
@@ -118,7 +126,7 @@ public class SstFileTest {
// projection: (shopId, orderId) -> (orderId)
SstFileReader sstFileReader =
- createSstFileReader(tempDir.toString(), new int[][] {new int[]
{1}}, null);
+ createSstFileReader(tempDir.toString(), "avro", new int[][]
{new int[] {1}}, null);
RowType projectedKeyType =
RowType.of(new LogicalType[] {new BigIntType(false)}, new
String[] {"key_orderId"});
RowDataSerializer projectedKeySerializer = new
RowDataSerializer(projectedKeyType);
@@ -141,7 +149,7 @@ public class SstFileTest {
@Test
public void testValueProjection() throws Exception {
SstTestDataGenerator.Data data = gen.next();
- SstFileWriter sstFileWriter = createSstFileWriter(tempDir.toString());
+ SstFileWriter sstFileWriter = createSstFileWriter(tempDir.toString(),
"avro");
SstFileMetaSerializer serializer =
new SstFileMetaSerializer(
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.ROW_TYPE);
@@ -154,6 +162,7 @@ public class SstFileTest {
SstFileReader sstFileReader =
createSstFileReader(
tempDir.toString(),
+ "avro",
null,
new int[][] {new int[] {2}, new int[] {4}, new int[]
{0}, new int[] {1}});
RowType projectedValueType =
@@ -188,29 +197,29 @@ public class SstFileTest {
kv.value().getInt(1))));
}
- private SstFileWriter createSstFileWriter(String path) {
+ private SstFileWriter createSstFileWriter(String path, String format) {
FileStorePathFactory pathFactory = new FileStorePathFactory(new
Path(path));
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) +
1024;
return new SstFileWriter.Factory(
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.ROW_TYPE,
- // normal avro format will buffer changes in memory
and we can't determine
+ // normal format will buffer changes in memory and we
can't determine
// if the written file size is really larger than
suggested, so we use a
- // special avro format which flushes for every added
element
- flushingAvro,
+ // special format which flushes for every added element
+ new FlushingFileFormat(format),
pathFactory,
suggestedFileSize)
.create(BinaryRowDataUtil.EMPTY_ROW, 0);
}
private SstFileReader createSstFileReader(
- String path, int[][] keyProjection, int[][] valueProjection) {
+ String path, String format, int[][] keyProjection, int[][]
valueProjection) {
FileStorePathFactory pathFactory = new FileStorePathFactory(new
Path(path));
SstFileReader.Factory factory =
new SstFileReader.Factory(
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.ROW_TYPE,
- flushingAvro,
+ new FlushingFileFormat(format),
pathFactory);
if (keyProjection != null) {
factory.withKeyProjection(keyProjection);
@@ -272,16 +281,17 @@ public class SstFileTest {
// expected.maxKey == lastFile.maxKey
assertThat(actual.get(actual.size() -
1).maxKey()).isEqualTo(expected.maxKey());
- // TODO check stats after they're collected
- /*
- for (int i = 0; i < expected.stats().length; i++) {
- List<FieldStats> actualStats = new ArrayList<>();
- for (SstFileMeta meta : actual) {
- actualStats.add(meta.stats()[i]);
- }
- checkRollingFileStats(expected.stats()[i], actualStats);
+ // check stats
+ for (int i = 0; i < expected.keyStats().length; i++) {
+ int idx = i;
+ StatsTestUtils.checkRollingFileStats(
+ expected.keyStats()[i], actual, m -> m.keyStats()[idx]);
+ }
+ for (int i = 0; i < expected.valueStats().length; i++) {
+ int idx = i;
+ StatsTestUtils.checkRollingFileStats(
+ expected.valueStats()[i], actual, m ->
m.valueStats()[idx]);
}
- */
// expected.minSequenceNumber == min(minSequenceNumber)
assertThat(actual.stream().mapToLong(SstFileMeta::minSequenceNumber).min().orElse(-1))
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
index bb39d7d..5d5edc4 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
@@ -98,7 +98,10 @@ public class SstTestDataGenerator {
}
private Data createSstFile(List<KeyValue> kvs, int level, BinaryRowData
partition, int bucket) {
- FieldStatsCollector collector = new
FieldStatsCollector(TestKeyValueGenerator.ROW_TYPE);
+ FieldStatsCollector keyStatsCollector =
+ new FieldStatsCollector(TestKeyValueGenerator.KEY_TYPE);
+ FieldStatsCollector valueStatsCollector =
+ new FieldStatsCollector(TestKeyValueGenerator.ROW_TYPE);
long totalSize = 0;
BinaryRowData minKey = null;
BinaryRowData maxKey = null;
@@ -108,7 +111,8 @@ public class SstTestDataGenerator {
BinaryRowData key = (BinaryRowData) kv.key();
BinaryRowData value = (BinaryRowData) kv.value();
totalSize += key.getSizeInBytes() + value.getSizeInBytes();
- collector.collect(value);
+ keyStatsCollector.collect(key);
+ valueStatsCollector.collect(value);
if (minKey == null ||
TestKeyValueGenerator.KEY_COMPARATOR.compare(key, minKey) < 0) {
minKey = key;
}
@@ -128,7 +132,8 @@ public class SstTestDataGenerator {
kvs.size(),
minKey,
maxKey,
- collector.extract(),
+ keyStatsCollector.extract(),
+ valueStatsCollector.extract(),
minSequenceNumber,
maxSequenceNumber,
level),
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FileStatsExtractorTestBase.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FileStatsExtractorTestBase.java
new file mode 100644
index 0000000..b1143ec
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FileStatsExtractorTestBase.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FileStatsExtractor}. */
+public abstract class FileStatsExtractorTestBase {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testExtract() throws Exception {
+ FileFormat format = createFormat();
+ RowType rowType = rowType();
+
+ BulkWriter.Factory<RowData> writerFactory =
format.createWriterFactory(rowType);
+ Path path = new Path(tempDir.toString() + "/test");
+ FSDataOutputStream out =
+ path.getFileSystem().create(path,
FileSystem.WriteMode.NO_OVERWRITE);
+ BulkWriter<RowData> writer = writerFactory.create(out);
+
+ List<GenericRowData> data = createData(rowType);
+ for (GenericRowData row : data) {
+ writer.addElement(row);
+ }
+ writer.finish();
+
+ FieldStatsCollector collector = new FieldStatsCollector(rowType);
+ for (GenericRowData row : data) {
+ collector.collect(row);
+ }
+ FieldStats[] expected = collector.extract();
+
+ FileStatsExtractor extractor =
format.createStatsExtractor(rowType).get();
+ assertThat(extractor).isNotNull();
+ FieldStats[] actual = extractor.extract(path);
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private List<GenericRowData> createData(RowType rowType) {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int numRows = random.nextInt(100);
+ List<List<Object>> columns = new ArrayList<>();
+ for (RowType.RowField field : rowType.getFields()) {
+ List<Object> column = new ArrayList<>();
+ int numValues = random.nextInt(numRows + 1);
+ for (int i = 0; i < numValues; i++) {
+ column.add(createField(field.getType()));
+ }
+ columns.add(column);
+ }
+ return createData(numRows, columns);
+ }
+
+ private List<GenericRowData> createData(int numRows, List<List<Object>>
columns) {
+ List<GenericRowData> rows = new ArrayList<>();
+ for (int i = 0; i < numRows; i++) {
+ rows.add(new GenericRowData(columns.size()));
+ }
+ for (int i = 0; i < columns.size(); i++) {
+ List<?> objects = new ArrayList<>(columns.get(i));
+ while (objects.size() < numRows) {
+ objects.add(null);
+ }
+ Collections.shuffle(objects);
+ for (int j = 0; j < numRows; j++) {
+ rows.get(j).setField(i, objects.get(j));
+ }
+ }
+ return rows;
+ }
+
+ private Object createField(LogicalType type) {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ switch (type.getTypeRoot()) {
+ case CHAR:
+ CharType charType = (CharType) type;
+ return
BinaryStringData.fromString(randomString(charType.getLength()));
+ case VARCHAR:
+ VarCharType varCharType = (VarCharType) type;
+ return BinaryStringData.fromString(
+ randomString(random.nextInt(varCharType.getLength()) +
1));
+ case BOOLEAN:
+ return random.nextBoolean();
+ case TINYINT:
+ return (byte) random.nextInt(10);
+ case SMALLINT:
+ return (short) random.nextInt(100);
+ case INTEGER:
+ return random.nextInt(1000);
+ case BIGINT:
+ return random.nextLong(10000);
+ case FLOAT:
+ return random.nextFloat();
+ case DOUBLE:
+ return random.nextDouble();
+ case DECIMAL:
+ return randomDecimalData((DecimalType) type);
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ return random.nextInt(10000);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return randomTimestampData((TimestampType) type);
+ case ARRAY:
+ return randomArray((ArrayType) type);
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported type " + type.asSummaryString());
+ }
+ }
+
+ private String randomString(int length) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < length; i++) {
+ builder.append((char) ThreadLocalRandom.current().nextInt(127 -
32));
+ }
+ return builder.toString();
+ }
+
+ private DecimalData randomDecimalData(DecimalType type) {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int p = type.getPrecision();
+ int s = type.getScale();
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < p - s; i++) {
+ builder.append((char) (random.nextInt(10) + '0'));
+ }
+ if (s > 0) {
+ builder.append('.');
+ for (int i = 0; i < s; i++) {
+ builder.append((char) (random.nextInt(10) + '0'));
+ }
+ }
+ return DecimalData.fromBigDecimal(new BigDecimal(builder.toString()),
p, s);
+ }
+
+ private TimestampData randomTimestampData(TimestampType type) {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ long p = 1;
+ for (int i = type.getPrecision(); i < TimestampType.MAX_PRECISION;
i++) {
+ p *= 10;
+ }
+ long currentSecond = System.currentTimeMillis() / 1000;
+ return TimestampData.fromInstant(
+ Instant.ofEpochSecond(
+ random.nextLong(currentSecond),
random.nextLong(1_000_000_000) / p * p));
+ }
+
+ private ArrayData randomArray(ArrayType type) {
+ int length = ThreadLocalRandom.current().nextInt(10);
+ Object[] javaArray = new Object[length];
+ for (int i = 0; i < length; i++) {
+ javaArray[i] = createField(type.getElementType());
+ }
+ return new GenericArrayData(javaArray);
+ }
+
+ protected abstract FileFormat createFormat();
+
+ protected abstract RowType rowType();
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
index 379b600..4ce3255 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/StatsTestUtils.java
@@ -18,7 +18,9 @@
package org.apache.flink.table.store.file.stats;
+import java.util.ArrayList;
import java.util.List;
+import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
@@ -26,7 +28,12 @@ import static org.assertj.core.api.Assertions.assertThat;
public class StatsTestUtils {
@SuppressWarnings("unchecked")
- public static void checkRollingFileStats(FieldStats expected,
List<FieldStats> actual) {
+ public static <T> void checkRollingFileStats(
+ FieldStats expected, List<T> actualObjects, Function<T,
FieldStats> mapToStats) {
+ List<FieldStats> actual = new ArrayList<>();
+ for (T object : actualObjects) {
+ actual.add(mapToStats.apply(object));
+ }
if (expected.minValue() instanceof Comparable) {
Object actualMin = null;
Object actualMax = null;
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/TestFileStatsExtractor.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/TestFileStatsExtractor.java
new file mode 100644
index 0000000..751ea18
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/TestFileStatsExtractor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link FileStatsExtractor} for test. It reads all records from the file and
use {@link
+ * FieldStatsCollector} to collect the stats.
+ */
+public class TestFileStatsExtractor implements FileStatsExtractor {
+
+ private final FileFormat format;
+ private final RowType rowType;
+
+ public TestFileStatsExtractor(FileFormat format, RowType rowType) {
+ this.format = format;
+ this.rowType = rowType;
+ }
+
+ @Override
+ public FieldStats[] extract(Path path) throws IOException {
+ IdentityObjectSerializer serializer = new
IdentityObjectSerializer(rowType);
+ BulkFormat<RowData, FileSourceSplit> readerFactory =
format.createReaderFactory(rowType);
+ List<RowData> records = FileUtils.readListFromFile(path, serializer,
readerFactory);
+ FieldStatsCollector statsCollector = new FieldStatsCollector(rowType);
+ for (RowData record : records) {
+ statsCollector.collect(record);
+ }
+ return statsCollector.extract();
+ }
+
+ private static class IdentityObjectSerializer extends
ObjectSerializer<RowData> {
+
+ public IdentityObjectSerializer(RowType rowType) {
+ super(rowType);
+ }
+
+ @Override
+ public RowData toRow(RowData record) {
+ return record;
+ }
+
+ @Override
+ public RowData fromRow(RowData rowData) {
+ return rowData;
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
b/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
new file mode 100644
index 0000000..7bf0c99
--- /dev/null
+++
b/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.file.format.FileStatsExtractingAvroFormatFactory
\ No newline at end of file
diff --git a/flink-table-store-orc/pom.xml b/flink-table-store-orc/pom.xml
new file mode 100644
index 0000000..a7a6da8
--- /dev/null
+++ b/flink-table-store-orc/pom.xml
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-table-store-parent</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-table-store-orc</artifactId>
+ <name>Flink Table Store : Orc</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- flink dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-orc</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
b/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormat.java
similarity index 50%
rename from
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
rename to
flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormat.java
index a6ff06b..0087cc3 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FlushingAvroFormat.java
+++
b/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormat.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,55 +16,44 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.utils;
+package org.apache.flink.table.store.orc;
import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.store.file.FileFormat;
-import org.apache.flink.table.store.file.mergetree.sst.SstFileTest;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.format.FileFormatImpl;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
import org.apache.flink.table.types.logical.RowType;
-import java.io.IOException;
import java.util.List;
+import java.util.Optional;
-/** A special avro {@link FileFormat} which flushes for every added element. */
-public class FlushingAvroFormat implements FileFormat {
+/** Orc {@link FileFormat}. */
+public class OrcFileFormat implements FileFormat {
- private final FileFormat avro =
- FileFormat.fromIdentifier(
- SstFileTest.class.getClassLoader(), "avro", new
Configuration());
+ private final FileFormatImpl format;
+
+ public OrcFileFormat(ClassLoader classLoader, ReadableConfig
formatOptions) {
+ this.format = new FileFormatImpl(classLoader, "orc", formatOptions);
+ }
@Override
public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
RowType type, int[][] projection, List<ResolvedExpression>
filters) {
- return avro.createReaderFactory(type, projection, filters);
+ return format.createReaderFactory(type, projection, filters);
}
@Override
public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
- return fsDataOutputStream -> {
- BulkWriter<RowData> wrapped =
avro.createWriterFactory(type).create(fsDataOutputStream);
- return new BulkWriter<RowData>() {
- @Override
- public void addElement(RowData rowData) throws IOException {
- wrapped.addElement(rowData);
- wrapped.flush();
- }
-
- @Override
- public void flush() throws IOException {
- wrapped.flush();
- }
+ return format.createWriterFactory(type);
+ }
- @Override
- public void finish() throws IOException {
- wrapped.finish();
- }
- };
- };
+ @Override
+ public Optional<FileStatsExtractor> createStatsExtractor(RowType type) {
+ return Optional.of(new OrcFileStatsExtractor(type));
}
}
diff --git
a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java
b/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java
new file mode 100644
index 0000000..c31dd10
--- /dev/null
+++
b/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileFormatFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.orc;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.format.FileFormatFactory;
+
+/** Factory to create {@link OrcFileFormat}. */
+public class OrcFileFormatFactory implements FileFormatFactory {
+
+ @Override
+ public String identifier() {
+ return "orc";
+ }
+
+ @Override
+ public FileFormat create(ClassLoader classLoader, ReadableConfig
formatOptions) {
+ return new OrcFileFormat(classLoader, formatOptions);
+ }
+}
diff --git
a/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileStatsExtractor.java
b/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileStatsExtractor.java
new file mode 100644
index 0000000..8d321af
--- /dev/null
+++
b/flink-table-store-orc/src/main/java/org/apache/flink/table/store/orc/OrcFileStatsExtractor.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.orc;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.BooleanColumnStatistics;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.DateColumnStatistics;
+import org.apache.orc.DecimalColumnStatistics;
+import org.apache.orc.DoubleColumnStatistics;
+import org.apache.orc.IntegerColumnStatistics;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TimestampColumnStatistics;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** {@link FileStatsExtractor} for orc files. */
+public class OrcFileStatsExtractor implements FileStatsExtractor {
+
+ private final RowType rowType;
+
+ public OrcFileStatsExtractor(RowType rowType) {
+ this.rowType = rowType;
+ }
+
+ @Override
+ public FieldStats[] extract(Path path) throws IOException {
+ org.apache.hadoop.fs.Path hadoopPath = new
org.apache.hadoop.fs.Path(path.toUri());
+ Reader reader =
+ OrcFile.createReader(hadoopPath, OrcFile.readerOptions(new
Configuration()));
+
+ long rowCount = reader.getNumberOfRows();
+ ColumnStatistics[] columnStatistics = reader.getStatistics();
+ TypeDescription schema = reader.getSchema();
+ List<String> columnNames = schema.getFieldNames();
+ List<TypeDescription> columnTypes = schema.getChildren();
+
+ return IntStream.range(0, rowType.getFieldCount())
+ .mapToObj(
+ i -> {
+ RowType.RowField field =
rowType.getFields().get(i);
+ int fieldIdx =
columnNames.indexOf(field.getName());
+ int colId = columnTypes.get(fieldIdx).getId();
+ return toFieldStats(field,
columnStatistics[colId], rowCount);
+ })
+ .toArray(FieldStats[]::new);
+ }
+
+ private FieldStats toFieldStats(RowType.RowField field, ColumnStatistics
stats, long rowCount) {
+ long nullCount = rowCount - stats.getNumberOfValues();
+ if (nullCount == rowCount) {
+ // all nulls
+ return new FieldStats(null, null, nullCount);
+ }
+ Preconditions.checkState(
+ (nullCount > 0) == stats.hasNull(),
+ "Bug in OrcFileStatsExtractor: nullCount is "
+ + nullCount
+ + " while stats.hasNull() is "
+ + stats.hasNull()
+ + "!");
+
+ switch (field.getType().getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ assertStatsClass(field, stats, StringColumnStatistics.class);
+ StringColumnStatistics stringStats = (StringColumnStatistics)
stats;
+ return new FieldStats(
+ StringData.fromString(stringStats.getMinimum()),
+ StringData.fromString(stringStats.getMaximum()),
+ nullCount);
+ case BOOLEAN:
+ assertStatsClass(field, stats, BooleanColumnStatistics.class);
+ BooleanColumnStatistics boolStats = (BooleanColumnStatistics)
stats;
+ return new FieldStats(
+ boolStats.getFalseCount() == 0,
boolStats.getTrueCount() != 0, nullCount);
+ case DECIMAL:
+ assertStatsClass(field, stats, DecimalColumnStatistics.class);
+ DecimalColumnStatistics decimalStats =
(DecimalColumnStatistics) stats;
+ DecimalType decimalType = (DecimalType) (field.getType());
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ return new FieldStats(
+ DecimalData.fromBigDecimal(
+ decimalStats.getMinimum().bigDecimalValue(),
precision, scale),
+ DecimalData.fromBigDecimal(
+ decimalStats.getMaximum().bigDecimalValue(),
precision, scale),
+ nullCount);
+ case TINYINT:
+ assertStatsClass(field, stats, IntegerColumnStatistics.class);
+ IntegerColumnStatistics byteStats = (IntegerColumnStatistics)
stats;
+ return new FieldStats(
+ (byte) byteStats.getMinimum(), (byte)
byteStats.getMaximum(), nullCount);
+ case SMALLINT:
+ assertStatsClass(field, stats, IntegerColumnStatistics.class);
+ IntegerColumnStatistics shortStats = (IntegerColumnStatistics)
stats;
+ return new FieldStats(
+ (short) shortStats.getMinimum(),
+ (short) shortStats.getMaximum(),
+ nullCount);
+ case INTEGER:
+ assertStatsClass(field, stats, IntegerColumnStatistics.class);
+ IntegerColumnStatistics intStats = (IntegerColumnStatistics)
stats;
+ return new FieldStats(
+ Long.valueOf(intStats.getMinimum()).intValue(),
+ Long.valueOf(intStats.getMaximum()).intValue(),
+ nullCount);
+ case BIGINT:
+ assertStatsClass(field, stats, IntegerColumnStatistics.class);
+ IntegerColumnStatistics longStats = (IntegerColumnStatistics)
stats;
+ return new FieldStats(longStats.getMinimum(),
longStats.getMaximum(), nullCount);
+ case FLOAT:
+ assertStatsClass(field, stats, DoubleColumnStatistics.class);
+ DoubleColumnStatistics floatStats = (DoubleColumnStatistics)
stats;
+ return new FieldStats(
+ (float) floatStats.getMinimum(),
+ (float) floatStats.getMaximum(),
+ nullCount);
+ case DOUBLE:
+ assertStatsClass(field, stats, DoubleColumnStatistics.class);
+ DoubleColumnStatistics doubleStats = (DoubleColumnStatistics)
stats;
+ return new FieldStats(
+ doubleStats.getMinimum(), doubleStats.getMaximum(),
nullCount);
+ case DATE:
+ assertStatsClass(field, stats, DateColumnStatistics.class);
+ DateColumnStatistics dateStats = (DateColumnStatistics) stats;
+ return new FieldStats(
+ DateTimeUtils.toInternal(new
Date(dateStats.getMinimum().getTime())),
+ DateTimeUtils.toInternal(new
Date(dateStats.getMaximum().getTime())),
+ nullCount);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ assertStatsClass(field, stats,
TimestampColumnStatistics.class);
+ TimestampColumnStatistics timestampStats =
(TimestampColumnStatistics) stats;
+ return new FieldStats(
+
TimestampData.fromTimestamp(timestampStats.getMinimum()),
+
TimestampData.fromTimestamp(timestampStats.getMaximum()),
+ nullCount);
+ default:
+ return new FieldStats(null, null, nullCount);
+ }
+ }
+
+ private void assertStatsClass(
+ RowType.RowField field,
+ ColumnStatistics stats,
+ Class<? extends ColumnStatistics> expectedClass) {
+ if (!expectedClass.isInstance(stats)) {
+ throw new IllegalArgumentException(
+ "Expecting "
+ + expectedClass.getName()
+ + " for field "
+ + field.asSummaryString()
+ + " but found "
+ + stats.getClass().getName());
+ }
+ }
+}
diff --git
a/flink-table-store-orc/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
b/flink-table-store-orc/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
new file mode 100644
index 0000000..354bcc0
--- /dev/null
+++
b/flink-table-store-orc/src/main/resources/META-INF/services/org.apache.flink.table.store.file.format.FileFormatFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.orc.OrcFileFormatFactory
\ No newline at end of file
diff --git
a/flink-table-store-orc/src/test/java/org/apache/flink/table/store/orc/OrcFileStatsExtractorTest.java
b/flink-table-store-orc/src/test/java/org/apache/flink/table/store/orc/OrcFileStatsExtractorTest.java
new file mode 100644
index 0000000..82d9271
--- /dev/null
+++
b/flink-table-store-orc/src/test/java/org/apache/flink/table/store/orc/OrcFileStatsExtractorTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.orc;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.stats.FileStatsExtractorTestBase;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/** Tests for {@link OrcFileStatsExtractor}. */
+public class OrcFileStatsExtractorTest extends FileStatsExtractorTestBase {
+
+ @Override
+ protected FileFormat createFormat() {
+ return FileFormat.fromIdentifier(
+ OrcFileStatsExtractorTest.class.getClassLoader(), "orc", new
Configuration());
+ }
+
+ @Override
+ protected RowType rowType() {
+ return RowType.of(
+ new CharType(8),
+ new VarCharType(8),
+ new BooleanType(),
+ new TinyIntType(),
+ new SmallIntType(),
+ new IntType(),
+ new BigIntType(),
+ new FloatType(),
+ new DoubleType(),
+ new DecimalType(5, 2),
+ new DecimalType(38, 18),
+ new DateType(),
+ new TimestampType(3),
+ // orc reader & writer currently cannot preserve a high
precision timestamp
+ // new TimestampType(9),
+ new ArrayType(new IntType()));
+ }
+}
diff --git a/pom.xml b/pom.xml
index 7d048b1..bb5620b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,10 +55,12 @@ under the License.
<module>flink-table-store-core</module>
<module>flink-table-store-connector</module>
<module>flink-table-store-kafka</module>
+ <module>flink-table-store-orc</module>
</modules>
<properties>
<flink.version>1.15-SNAPSHOT</flink.version>
+ <hadoop.version>2.8.5</hadoop.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.15</slf4j.version>
<log4j.version>2.17.1</log4j.version>