This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 157966f6 [FLINK-27843] Schema evolution for data file meta
157966f6 is described below
commit 157966f696522f3c7e939c4cd3713b01c9081017
Author: shammon <[email protected]>
AuthorDate: Fri Nov 18 11:58:29 2022 +0800
[FLINK-27843] Schema evolution for data file meta
This closes #376
---
.../table/store/file/AppendOnlyFileStore.java | 2 +
.../flink/table/store/file/KeyValueFileStore.java | 7 +
.../file/operation/AbstractFileStoreScan.java | 21 +
.../file/operation/AppendOnlyFileStoreScan.java | 32 +-
.../file/operation/KeyValueFileStoreScan.java | 41 +-
.../store/file/schema/KeyFieldsExtractor.java | 33 ++
.../flink/table/store/file/schema/RowDataType.java | 2 +-
.../store/file/schema/SchemaEvolutionUtil.java | 65 +++
.../flink/table/store/file/schema/TableSchema.java | 19 +-
.../file/stats/FieldStatsArraySerializer.java | 18 +-
.../table/ChangelogValueCountFileStoreTable.java | 24 +-
.../table/ChangelogWithKeyFileStoreTable.java | 35 +-
.../flink/table/store/file/TestFileStore.java | 14 +-
.../table/store/file/TestKeyValueGenerator.java | 20 +
.../table/store/file/io/DataFileTestUtils.java | 10 +
.../store/file/operation/FileStoreCommitTest.java | 1 +
.../store/file/operation/FileStoreExpireTest.java | 1 +
.../file/operation/KeyValueFileStoreReadTest.java | 26 +-
.../file/operation/KeyValueFileStoreScanTest.java | 1 +
.../store/file/schema/SchemaEvolutionUtilTest.java | 56 ++
.../file/stats/FieldStatsArraySerializerTest.java | 120 ++++
.../table/AppendOnlyTableFileMetaFilterTest.java | 51 ++
.../ChangelogValueCountFileMetaFilterTest.java | 59 ++
.../table/ChangelogWithKeyFileMetaFilterTest.java | 137 +++++
.../table/store/table/FileMetaFilterTestBase.java | 603 +++++++++++++++++++++
25 files changed, 1373 insertions(+), 25 deletions(-)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index 18cc6b36..0d889213 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -76,6 +76,8 @@ public class AppendOnlyFileStore extends
AbstractFileStore<RowData> {
bucketKeyType.getFieldCount() == 0 ? rowType : bucketKeyType,
rowType,
snapshotManager(),
+ schemaManager,
+ schemaId,
manifestFileFactory(),
manifestListFactory(),
options.bucket(),
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 7f7602b6..29fe95dd 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -24,6 +24,7 @@ import
org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreWrite;
+import org.apache.flink.table.store.file.schema.KeyFieldsExtractor;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.KeyComparatorSupplier;
import org.apache.flink.table.types.logical.RowType;
@@ -39,6 +40,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
private final RowType bucketKeyType;
private final RowType keyType;
private final RowType valueType;
+ private final KeyFieldsExtractor keyFieldsExtractor;
private final Supplier<Comparator<RowData>> keyComparatorSupplier;
private final MergeFunction<KeyValue> mergeFunction;
@@ -50,11 +52,13 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
RowType bucketKeyType,
RowType keyType,
RowType valueType,
+ KeyFieldsExtractor keyFieldsExtractor,
MergeFunction<KeyValue> mergeFunction) {
super(schemaManager, schemaId, options, partitionType);
this.bucketKeyType = bucketKeyType;
this.keyType = keyType;
this.valueType = valueType;
+ this.keyFieldsExtractor = keyFieldsExtractor;
this.mergeFunction = mergeFunction;
this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
}
@@ -99,6 +103,9 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
bucketKeyType,
keyType,
snapshotManager(),
+ schemaManager,
+ schemaId,
+ keyFieldsExtractor,
manifestFileFactory(),
manifestListFactory(),
options.bucket(),
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index acb4be11..bd8af8aa 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -29,6 +29,8 @@ import
org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.predicate.BucketSelector;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
@@ -41,7 +43,9 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -59,6 +63,10 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private final boolean checkNumOfBuckets;
private final CoreOptions.ChangelogProducer changelogProducer;
+ private final Map<Long, TableSchema> tableSchemas;
+ private final SchemaManager schemaManager;
+ private final long schemaId;
+
private Predicate partitionFilter;
private BucketSelector bucketSelector;
@@ -72,6 +80,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
RowType partitionType,
RowType bucketKeyType,
SnapshotManager snapshotManager,
+ SchemaManager schemaManager,
+ long schemaId,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
int numOfBuckets,
@@ -83,11 +93,14 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
bucketKeyType.getFieldCount() > 0, "The bucket keys should not
be empty.");
this.bucketKeyType = bucketKeyType;
this.snapshotManager = snapshotManager;
+ this.schemaManager = schemaManager;
+ this.schemaId = schemaId;
this.manifestFileFactory = manifestFileFactory;
this.manifestList = manifestListFactory.create();
this.numOfBuckets = numOfBuckets;
this.checkNumOfBuckets = checkNumOfBuckets;
this.changelogProducer = changelogProducer;
+ this.tableSchemas = new HashMap<>();
}
@Override
@@ -244,6 +257,14 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
};
}
+ protected TableSchema scanTableSchema() {
+ return scanTableSchema(this.schemaId);
+ }
+
+ protected TableSchema scanTableSchema(long id) {
+ return tableSchemas.computeIfAbsent(id, key ->
schemaManager.schema(id));
+ }
+
private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
return partitionFilter == null
|| partitionFilter.test(
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
index bdc0389b..717d3a79 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
@@ -23,11 +23,16 @@ import
org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.SchemaEvolutionUtil;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.types.logical.RowType;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.pickTransformFieldMapping;
@@ -36,7 +41,7 @@ import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.split
/** {@link FileStoreScan} for {@link
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
- private final FieldStatsArraySerializer rowStatsConverter;
+ private final Map<Long, FieldStatsArraySerializer>
schemaRowStatsConverters;
private final RowType rowType;
private Predicate filter;
@@ -46,6 +51,8 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
RowType bucketKeyType,
RowType rowType,
SnapshotManager snapshotManager,
+ SchemaManager schemaManager,
+ long schemaId,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
int numOfBuckets,
@@ -54,12 +61,14 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
partitionType,
bucketKeyType,
snapshotManager,
+ schemaManager,
+ schemaId,
manifestFileFactory,
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
CoreOptions.ChangelogProducer.NONE);
- this.rowStatsConverter = new FieldStatsArraySerializer(rowType);
+ this.schemaRowStatsConverters = new HashMap<>();
this.rowType = rowType;
}
@@ -84,6 +93,23 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
entry.file().rowCount(),
entry.file()
.valueStats()
- .fields(rowStatsConverter,
entry.file().rowCount()));
+ .fields(
+
getFieldStatsArraySerializer(entry.file().schemaId()),
+ entry.file().rowCount()));
+ }
+
+ private FieldStatsArraySerializer getFieldStatsArraySerializer(long
schemaId) {
+ return schemaRowStatsConverters.computeIfAbsent(
+ schemaId,
+ id -> {
+ TableSchema tableSchema = scanTableSchema();
+ TableSchema schema = scanTableSchema(id);
+ return new FieldStatsArraySerializer(
+ schema.logicalRowType(),
+ tableSchema.id() == id
+ ? null
+ : SchemaEvolutionUtil.createIndexMapping(
+ tableSchema.fields(),
schema.fields()));
+ });
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
index bdfac434..5dc88a45 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
@@ -23,11 +23,19 @@ import
org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.KeyFieldsExtractor;
+import org.apache.flink.table.store.file.schema.RowDataType;
+import org.apache.flink.table.store.file.schema.SchemaEvolutionUtil;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.types.logical.RowType;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.pickTransformFieldMapping;
@@ -36,7 +44,8 @@ import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.split
/** {@link FileStoreScan} for {@link
org.apache.flink.table.store.file.KeyValueFileStore}. */
public class KeyValueFileStoreScan extends AbstractFileStoreScan {
- private final FieldStatsArraySerializer keyStatsConverter;
+ private final Map<Long, FieldStatsArraySerializer>
schemaKeyStatsConverters;
+ private final KeyFieldsExtractor keyFieldsExtractor;
private final RowType keyType;
private Predicate keyFilter;
@@ -46,6 +55,9 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
RowType bucketKeyType,
RowType keyType,
SnapshotManager snapshotManager,
+ SchemaManager schemaManager,
+ long schemaId,
+ KeyFieldsExtractor keyFieldsExtractor,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
int numOfBuckets,
@@ -55,12 +67,15 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
partitionType,
bucketKeyType,
snapshotManager,
+ schemaManager,
+ schemaId,
manifestFileFactory,
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
changelogProducer);
- this.keyStatsConverter = new FieldStatsArraySerializer(keyType);
+ this.keyFieldsExtractor = keyFieldsExtractor;
+ this.schemaKeyStatsConverters = new HashMap<>();
this.keyType = keyType;
}
@@ -83,6 +98,26 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
return keyFilter == null
|| keyFilter.test(
entry.file().rowCount(),
- entry.file().keyStats().fields(keyStatsConverter,
entry.file().rowCount()));
+ entry.file()
+ .keyStats()
+ .fields(
+
getFieldStatsArraySerializer(entry.file().schemaId()),
+ entry.file().rowCount()));
+ }
+
+ private FieldStatsArraySerializer getFieldStatsArraySerializer(long id) {
+ return schemaKeyStatsConverters.computeIfAbsent(
+ id,
+ key -> {
+ final TableSchema tableSchema = scanTableSchema();
+ final TableSchema schema = scanTableSchema(key);
+ final List<DataField> keyFields =
keyFieldsExtractor.keyFields(schema);
+ return new FieldStatsArraySerializer(
+ RowDataType.toRowType(false, keyFields),
+ tableSchema.id() == key
+ ? null
+ : SchemaEvolutionUtil.createIndexMapping(
+
keyFieldsExtractor.keyFields(tableSchema), keyFields));
+ });
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/KeyFieldsExtractor.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/KeyFieldsExtractor.java
new file mode 100644
index 00000000..937ed160
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/KeyFieldsExtractor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.schema;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** Extractor of schema for different tables. */
+public interface KeyFieldsExtractor extends Serializable {
+ /**
+ * Extract key fields from table schema.
+ *
+ * @param schema the table schema
+ * @return the key fields
+ */
+ List<DataField> keyFields(TableSchema schema);
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/RowDataType.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/RowDataType.java
index 081b4b88..94916d13 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/RowDataType.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/RowDataType.java
@@ -40,7 +40,7 @@ public class RowDataType extends DataType {
this.fields = fields;
}
- private static RowType toRowType(boolean isNullable, List<DataField>
fields) {
+ public static RowType toRowType(boolean isNullable, List<DataField>
fields) {
List<RowType.RowField> typeFields = new ArrayList<>(fields.size());
for (DataField field : fields) {
typeFields.add(
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
new file mode 100644
index 00000000..e8ee559b
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.schema;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Utils for schema evolution. */
+public class SchemaEvolutionUtil {
+
+ private static final int NULL_FIELD_INDEX = -1;
+
+ /**
+ * Create index mapping from table fields to underlying data fields.
+ *
+ * @param tableFields the fields of table
+ * @param dataFields the fields of underlying data
+ * @return the index mapping
+ */
+ @Nullable
+ public static int[] createIndexMapping(
+ List<DataField> tableFields, List<DataField> dataFields) {
+ int[] indexMapping = new int[tableFields.size()];
+ Map<Integer, Integer> fieldIdToIndex = new HashMap<>();
+ for (int i = 0; i < dataFields.size(); i++) {
+ fieldIdToIndex.put(dataFields.get(i).id(), i);
+ }
+
+ for (int i = 0; i < tableFields.size(); i++) {
+ int fieldId = tableFields.get(i).id();
+ Integer dataFieldIndex = fieldIdToIndex.get(fieldId);
+ if (dataFieldIndex != null) {
+ indexMapping[i] = dataFieldIndex;
+ } else {
+ indexMapping[i] = NULL_FIELD_INDEX;
+ }
+ }
+
+ for (int i = 0; i < indexMapping.length; i++) {
+ if (indexMapping[i] != i) {
+ return indexMapping;
+ }
+ }
+ return null;
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
index bed3d8eb..9ef955ff 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
@@ -195,20 +195,25 @@ public class TableSchema implements Serializable {
return projectedLogicalRowType(trimmedPrimaryKeys());
}
+ public List<DataField> trimmedPrimaryKeysFields() {
+ return projectedDataFields(trimmedPrimaryKeys());
+ }
+
public int[] projection(List<String> projectedFieldNames) {
List<String> fieldNames = fieldNames();
return
projectedFieldNames.stream().mapToInt(fieldNames::indexOf).toArray();
}
- private RowType projectedLogicalRowType(List<String> projectedFieldNames) {
+ private List<DataField> projectedDataFields(List<String>
projectedFieldNames) {
List<String> fieldNames = fieldNames();
+ return projectedFieldNames.stream()
+ .map(k -> fields.get(fieldNames.indexOf(k)))
+ .collect(Collectors.toList());
+ }
+
+ private RowType projectedLogicalRowType(List<String> projectedFieldNames) {
return (RowType)
- new RowDataType(
- false,
- projectedFieldNames.stream()
- .map(k ->
fields.get(fieldNames.indexOf(k)))
- .collect(Collectors.toList()))
- .logicalType;
+ new RowDataType(false,
projectedDataFields(projectedFieldNames)).logicalType;
}
public TableSchema copy(Map<String, String> newOptions) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
index 10d23e8b..3ce0bcf5 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
@@ -43,7 +43,13 @@ public class FieldStatsArraySerializer {
private final RowData.FieldGetter[] fieldGetters;
+ @Nullable private final int[] indexMapping;
+
public FieldStatsArraySerializer(RowType type) {
+ this(type, null);
+ }
+
+ public FieldStatsArraySerializer(RowType type, int[] indexMapping) {
RowType safeType = toAllFieldsNullableRowType(type);
this.serializer = new RowDataSerializer(safeType);
this.fieldGetters =
@@ -53,6 +59,7 @@ public class FieldStatsArraySerializer {
RowDataUtils.createNullCheckingFieldGetter(
safeType.getTypeAt(i), i))
.toArray(RowData.FieldGetter[]::new);
+ this.indexMapping = indexMapping;
}
public BinaryTableStats toBinary(FieldStats[] stats) {
@@ -77,10 +84,11 @@ public class FieldStatsArraySerializer {
}
public FieldStats[] fromBinary(BinaryTableStats array, @Nullable Long
rowCount) {
- int fieldCount = fieldGetters.length;
+ int fieldCount = indexMapping == null ? fieldGetters.length :
indexMapping.length;
FieldStats[] stats = new FieldStats[fieldCount];
for (int i = 0; i < fieldCount; i++) {
- if (i >= array.min().getArity()) {
+ int fieldIndex = indexMapping == null ? i : indexMapping[i];
+ if (fieldIndex < 0 || fieldIndex >= array.min().getArity()) {
// simple evolution for add column
if (rowCount == null) {
throw new RuntimeException("Schema Evolution for stats
needs row count.");
@@ -89,9 +97,9 @@ public class FieldStatsArraySerializer {
} else {
stats[i] =
new FieldStats(
- fieldGetters[i].getFieldOrNull(array.min()),
- fieldGetters[i].getFieldOrNull(array.max()),
- array.nullCounts()[i]);
+
fieldGetters[fieldIndex].getFieldOrNull(array.min()),
+
fieldGetters[fieldIndex].getFieldOrNull(array.max()),
+ array.nullCounts()[fieldIndex]);
}
}
return stats;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index b96cbaa9..05294e60 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -29,6 +29,9 @@ import
org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import
org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.KeyFieldsExtractor;
+import org.apache.flink.table.store.file.schema.RowDataType;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -47,6 +50,8 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
+import java.util.List;
+
/** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode without
primary keys. */
public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
@@ -61,6 +66,7 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
RowType.of(
new LogicalType[] {new BigIntType(false)}, new
String[] {"_VALUE_COUNT"});
MergeFunction<KeyValue> mergeFunction = new ValueCountMergeFunction();
+ KeyFieldsExtractor extractor =
ValueCountTableKeyFieldsExtractor.EXTRACTOR;
this.store =
new KeyValueFileStore(
schemaManager,
@@ -68,8 +74,9 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
new CoreOptions(tableSchema.options()),
tableSchema.logicalPartitionType(),
tableSchema.logicalBucketKeyType(),
- tableSchema.logicalRowType(),
+ RowDataType.toRowType(false,
extractor.keyFields(tableSchema)),
countType,
+ extractor,
mergeFunction);
}
@@ -146,4 +153,19 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
public KeyValueFileStore store() {
return store;
}
+
+ /** {@link KeyFieldsExtractor} implementation for {@link
ChangelogValueCountFileStoreTable}. */
+ static class ValueCountTableKeyFieldsExtractor implements
KeyFieldsExtractor {
+ private static final long serialVersionUID = 1L;
+
+ static final ValueCountTableKeyFieldsExtractor EXTRACTOR =
+ new ValueCountTableKeyFieldsExtractor();
+
+ private ValueCountTableKeyFieldsExtractor() {}
+
+ @Override
+ public List<DataField> keyFields(TableSchema schema) {
+ return schema.fields();
+ }
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 79dff30a..baef2dfa 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -31,6 +31,9 @@ import
org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFun
import
org.apache.flink.table.store.file.mergetree.compact.aggregate.AggregateMergeFunction;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.KeyFieldsExtractor;
+import org.apache.flink.table.store.file.schema.RowDataType;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -101,6 +104,7 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
}
CoreOptions options = new CoreOptions(conf);
+ KeyFieldsExtractor extractor =
ChangelogWithKeyKeyFieldsExtractor.EXTRACTOR;
this.store =
new KeyValueFileStore(
schemaManager,
@@ -108,12 +112,13 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
options,
tableSchema.logicalPartitionType(),
addKeyNamePrefix(tableSchema.logicalBucketKeyType()),
-
addKeyNamePrefix(tableSchema.logicalTrimmedPrimaryKeysType()),
+ RowDataType.toRowType(false,
extractor.keyFields(tableSchema)),
rowType,
+ extractor,
mergeFunction);
}
- private RowType addKeyNamePrefix(RowType type) {
+ private static RowType addKeyNamePrefix(RowType type) {
// add prefix to avoid conflict with value
return new RowType(
type.getFields().stream()
@@ -126,6 +131,18 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
.collect(Collectors.toList()));
}
+ private static List<DataField> addKeyNamePrefix(List<DataField> keyFields)
{
+ return keyFields.stream()
+ .map(
+ f ->
+ new DataField(
+ f.id(),
+ KEY_FIELD_PREFIX + f.name(),
+ f.type(),
+ f.description()))
+ .collect(Collectors.toList());
+ }
+
@Override
public DataTableScan newScan() {
KeyValueFileStoreScan scan = store.newScan();
@@ -215,4 +232,18 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
public KeyValueFileStore store() {
return store;
}
+
+ static class ChangelogWithKeyKeyFieldsExtractor implements
KeyFieldsExtractor {
+ private static final long serialVersionUID = 1L;
+
+ static final ChangelogWithKeyKeyFieldsExtractor EXTRACTOR =
+ new ChangelogWithKeyKeyFieldsExtractor();
+
+ private ChangelogWithKeyKeyFieldsExtractor() {}
+
+ @Override
+ public List<DataField> keyFields(TableSchema schema) {
+ return addKeyNamePrefix(schema.trimmedPrimaryKeysFields());
+ }
+ }
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 09722d51..9d280801 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -38,6 +38,7 @@ import
org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.schema.KeyFieldsExtractor;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
@@ -92,6 +93,7 @@ public class TestFileStore extends KeyValueFileStore {
RowType partitionType,
RowType keyType,
RowType valueType,
+ KeyFieldsExtractor keyFieldsExtractor,
MergeFunction<KeyValue> mergeFunction) {
super(
new SchemaManager(options.path()),
@@ -101,6 +103,7 @@ public class TestFileStore extends KeyValueFileStore {
keyType,
keyType,
valueType,
+ keyFieldsExtractor,
mergeFunction);
this.root = root;
this.keySerializer = new RowDataSerializer(keyType);
@@ -453,6 +456,7 @@ public class TestFileStore extends KeyValueFileStore {
private final RowType partitionType;
private final RowType keyType;
private final RowType valueType;
+ private final KeyFieldsExtractor keyFieldsExtractor;
private final MergeFunction<KeyValue> mergeFunction;
private CoreOptions.ChangelogProducer changelogProducer;
@@ -464,6 +468,7 @@ public class TestFileStore extends KeyValueFileStore {
RowType partitionType,
RowType keyType,
RowType valueType,
+ KeyFieldsExtractor keyFieldsExtractor,
MergeFunction<KeyValue> mergeFunction) {
this.format = format;
this.root = root;
@@ -471,6 +476,7 @@ public class TestFileStore extends KeyValueFileStore {
this.partitionType = partitionType;
this.keyType = keyType;
this.valueType = valueType;
+ this.keyFieldsExtractor = keyFieldsExtractor;
this.mergeFunction = mergeFunction;
this.changelogProducer = CoreOptions.ChangelogProducer.NONE;
@@ -500,7 +506,13 @@ public class TestFileStore extends KeyValueFileStore {
conf.set(CoreOptions.CHANGELOG_PRODUCER, changelogProducer);
return new TestFileStore(
- root, new CoreOptions(conf), partitionType, keyType,
valueType, mergeFunction);
+ root,
+ new CoreOptions(conf),
+ partitionType,
+ keyType,
+ valueType,
+ keyFieldsExtractor,
+ mergeFunction);
}
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
index 90a02764..28c76294 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
@@ -25,6 +25,9 @@ import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.KeyFieldsExtractor;
+import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
@@ -104,6 +107,8 @@ public class TestKeyValueGenerator {
new String[] {"shopId", "orderId", "itemId",
"priceAmount", "comment"});
public static final RowType NON_PARTITIONED_PART_TYPE = RowType.of();
+ public static final List<String> KEY_NAME_LIST = Arrays.asList("shopId",
"orderId");
+
public static final RowType KEY_TYPE =
RowType.of(
new LogicalType[] {new IntType(false), new
BigIntType(false)},
@@ -323,4 +328,19 @@ public class TestKeyValueGenerator {
SINGLE_PARTITIONED,
MULTI_PARTITIONED
}
+
+ /** {@link KeyFieldsExtractor} implementation for test. */
+ public static class TestKeyFieldsExtractor implements KeyFieldsExtractor {
+ private static final long serialVersionUID = 1L;
+
+ public static final TestKeyFieldsExtractor EXTRACTOR = new
TestKeyFieldsExtractor();
+
+ @Override
+ public List<DataField> keyFields(TableSchema schema) {
+ return schema.fields().stream()
+ .filter(f -> KEY_NAME_LIST.contains(f.name()))
+ .map(f -> new DataField(f.id(), "key_" + f.name(),
f.type(), f.description()))
+ .collect(Collectors.toList());
+ }
+ }
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/DataFileTestUtils.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/DataFileTestUtils.java
index c8480cb9..cbc0a36b 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/DataFileTestUtils.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/io/DataFileTestUtils.java
@@ -86,4 +86,14 @@ public class DataFileTestUtils {
writer.complete();
return row;
}
+
+ public static BinaryRowData row(int... values) {
+ BinaryRowData row = new BinaryRowData(values.length);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ for (int i = 0; i < values.length; i++) {
+ writer.writeInt(i, values[i]);
+ }
+ writer.complete();
+ return row;
+ }
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 85b2733a..c24c71dc 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -537,6 +537,7 @@ public class FileStoreCommitTest {
TestKeyValueGenerator.DEFAULT_PART_TYPE,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ TestKeyValueGenerator.TestKeyFieldsExtractor.EXTRACTOR,
new DeduplicateMergeFunction())
.changelogProducer(changelogProducer)
.build();
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index 90ac35ba..0c40fd28 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -92,6 +92,7 @@ public class FileStoreExpireTest {
TestKeyValueGenerator.DEFAULT_PART_TYPE,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ TestKeyValueGenerator.TestKeyFieldsExtractor.EXTRACTOR,
new DeduplicateMergeFunction())
.changelogProducer(changelogProducer)
.build();
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
index 1dcc2abe..d2ec2bec 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
@@ -29,7 +29,10 @@ import
org.apache.flink.table.store.file.manifest.ManifestEntry;
import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import
org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.KeyFieldsExtractor;
import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
@@ -45,6 +48,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -90,12 +94,13 @@ public class KeyValueFileStoreReadTest {
RowType partitionType =
RowType.of(new LogicalType[] {new IntType(false)}, new
String[] {"c"});
RowDataSerializer partitionSerializer = new
RowDataSerializer(partitionType);
+ List<String> keyNames = Arrays.asList("a", "b", "c");
RowType keyType =
RowType.of(
new LogicalType[] {
new IntType(false), new IntType(false), new
IntType(false)
},
- new String[] {"a", "b", "c"});
+ keyNames.toArray(new String[0]));
RowType projectedKeyType = RowType.of(new IntType(false), new
IntType(false));
RowDataSerializer projectedKeySerializer = new
RowDataSerializer(projectedKeyType);
RowType valueType =
@@ -103,7 +108,21 @@ public class KeyValueFileStoreReadTest {
RowDataSerializer valueSerializer = new RowDataSerializer(valueType);
TestFileStore store =
- createStore(partitionType, keyType, valueType, new
ValueCountMergeFunction());
+ createStore(
+ partitionType,
+ keyType,
+ valueType,
+ new KeyFieldsExtractor() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public List<DataField> keyFields(TableSchema
schema) {
+ return schema.fields().stream()
+ .filter(f ->
keyNames.contains(f.name()))
+ .collect(Collectors.toList());
+ }
+ },
+ new ValueCountMergeFunction());
List<KeyValue> readData =
writeThenRead(
data,
@@ -142,6 +161,7 @@ public class KeyValueFileStoreReadTest {
TestKeyValueGenerator.DEFAULT_PART_TYPE,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ TestKeyValueGenerator.TestKeyFieldsExtractor.EXTRACTOR,
new DeduplicateMergeFunction());
RowDataSerializer projectedValueSerializer =
@@ -230,6 +250,7 @@ public class KeyValueFileStoreReadTest {
RowType partitionType,
RowType keyType,
RowType valueType,
+ KeyFieldsExtractor extractor,
MergeFunction<KeyValue> mergeFunction)
throws Exception {
SchemaManager schemaManager = new SchemaManager(new
Path(tempDir.toUri()));
@@ -254,6 +275,7 @@ public class KeyValueFileStoreReadTest {
partitionType,
keyType,
valueType,
+ extractor,
mergeFunction)
.build();
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
index ea5bcda9..66698c66 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
@@ -71,6 +71,7 @@ public class KeyValueFileStoreScanTest {
TestKeyValueGenerator.DEFAULT_PART_TYPE,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+
TestKeyValueGenerator.TestKeyFieldsExtractor.EXTRACTOR,
new DeduplicateMergeFunction())
.build();
snapshotManager = store.snapshotManager();
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtilTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtilTest.java
new file mode 100644
index 00000000..0e929c73
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/schema/SchemaEvolutionUtilTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.schema;
+
+import org.apache.flink.table.api.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SchemaEvolutionUtil}. */
+public class SchemaEvolutionUtilTest {
+ @Test
+ public void testCreateIndexMapping() {
+ List<DataField> dataFields =
+ Arrays.asList(
+ new DataField(0, "a", new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(1, "b", new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(2, "c", new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(
+ 3, "d", new
AtomicDataType(DataTypes.INT().getLogicalType())));
+ List<DataField> tableFields =
+ Arrays.asList(
+ new DataField(1, "c", new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(3, "a", new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(5, "d", new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(
+ 6, "e", new
AtomicDataType(DataTypes.INT().getLogicalType())));
+ int[] indexMapping =
SchemaEvolutionUtil.createIndexMapping(tableFields, dataFields);
+
+
assertThat(indexMapping.length).isEqualTo(tableFields.size()).isEqualTo(4);
+ assertThat(indexMapping[0]).isEqualTo(1);
+ assertThat(indexMapping[1]).isEqualTo(3);
+ assertThat(indexMapping[2]).isLessThan(0);
+ assertThat(indexMapping[3]).isLessThan(0);
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
new file mode 100644
index 00000000..2e63eea5
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializerTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.stats;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.schema.AtomicDataType;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.SchemaEvolutionUtil;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.format.FieldStats;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.table.store.file.io.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FieldStatsArraySerializer}. */
+public class FieldStatsArraySerializerTest {
+ @Test
+ public void testFromBinary() {
+ TableSchema dataSchema =
+ new TableSchema(
+ 0,
+ Arrays.asList(
+ new DataField(
+ 0,
+ "a",
+ new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(
+ 1,
+ "b",
+ new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(
+ 2,
+ "c",
+ new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(
+ 3,
+ "d",
+ new
AtomicDataType(DataTypes.INT().getLogicalType()))),
+ 3,
+ Collections.EMPTY_LIST,
+ Collections.EMPTY_LIST,
+ Collections.EMPTY_MAP,
+ "");
+ TableSchema tableSchema =
+ new TableSchema(
+ 0,
+ Arrays.asList(
+ new DataField(
+ 1,
+ "c",
+ new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(
+ 3,
+ "a",
+ new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(
+ 5,
+ "d",
+ new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(
+ 6,
+ "e",
+ new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(
+ 7,
+ "b",
+ new
AtomicDataType(DataTypes.INT().getLogicalType()))),
+ 7,
+ Collections.EMPTY_LIST,
+ Collections.EMPTY_LIST,
+ Collections.EMPTY_MAP,
+ "");
+
+ FieldStatsArraySerializer fieldStatsArraySerializer =
+ new FieldStatsArraySerializer(
+ tableSchema.logicalRowType(),
+ SchemaEvolutionUtil.createIndexMapping(
+ tableSchema.fields(), dataSchema.fields()));
+ BinaryRowData minRowData = row(1, 2, 3, 4);
+ BinaryRowData maxRowData = row(100, 99, 98, 97);
+ long[] nullCounts = new long[] {1, 0, 10, 100};
+ BinaryTableStats dataTableStats = new BinaryTableStats(minRowData,
maxRowData, nullCounts);
+
+ FieldStats[] fieldStatsArray =
dataTableStats.fields(fieldStatsArraySerializer, 1000L);
+
assertThat(fieldStatsArray.length).isEqualTo(tableSchema.fields().size()).isEqualTo(5);
+ checkFieldStats(fieldStatsArray[0], 2, 99, 0L);
+ checkFieldStats(fieldStatsArray[1], 4, 97, 100L);
+ checkFieldStats(fieldStatsArray[2], null, null, 1000L);
+ checkFieldStats(fieldStatsArray[3], null, null, 1000L);
+ checkFieldStats(fieldStatsArray[4], null, null, 1000L);
+ }
+
+ private void checkFieldStats(FieldStats fieldStats, Integer min, Integer
max, Long nullCount) {
+ assertThat(fieldStats.minValue()).isEqualTo(min);
+ assertThat(fieldStats.maxValue()).isEqualTo(max);
+ assertThat(fieldStats.nullCount()).isEqualTo(nullCount);
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyTableFileMetaFilterTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyTableFileMetaFilterTest.java
new file mode 100644
index 00000000..84166a03
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyTableFileMetaFilterTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Map;
+
+/** Tests for meta files in {@link AppendOnlyFileStoreTable} with schema
evolution. */
+public class AppendOnlyTableFileMetaFilterTest extends FileMetaFilterTestBase {
+
+ @BeforeEach
+ public void before() throws Exception {
+ super.before();
+ tableConfig.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+ }
+
+ @Override
+ protected FileStoreTable createFileStoreTable(Map<Long, TableSchema>
tableSchemas) {
+ SchemaManager schemaManager = new TestingSchemaManager(tablePath,
tableSchemas);
+ return new AppendOnlyFileStoreTable(tablePath, schemaManager,
schemaManager.latest().get());
+ }
+
+ @Override
+ protected BinaryTableStats getTableValueStats(DataFileMeta fileMeta) {
+ return fileMeta.valueStats();
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileMetaFilterTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileMetaFilterTest.java
new file mode 100644
index 00000000..28667fd9
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileMetaFilterTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** Tests for meta files in {@link ChangelogValueCountFileStoreTable} with
schema evolution. */
+public class ChangelogValueCountFileMetaFilterTest extends
FileMetaFilterTestBase {
+
+ @BeforeEach
+ public void before() throws Exception {
+ super.before();
+ tableConfig.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ }
+
+ @Override
+ protected List<String> getPrimaryKeyNames() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ protected FileStoreTable createFileStoreTable(Map<Long, TableSchema>
tableSchemas) {
+ SchemaManager schemaManager = new TestingSchemaManager(tablePath,
tableSchemas);
+ return new ChangelogValueCountFileStoreTable(
+ tablePath, schemaManager, schemaManager.latest().get());
+ }
+
+ @Override
+ protected BinaryTableStats getTableValueStats(DataFileMeta fileMeta) {
+ return fileMeta.keyStats();
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileMetaFilterTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileMetaFilterTest.java
new file mode 100644
index 00000000..b664f259
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileMetaFilterTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Tests for meta files in {@link ChangelogWithKeyFileStoreTable} with schema
evolution. */
+public class ChangelogWithKeyFileMetaFilterTest extends FileMetaFilterTestBase
{
+
+ @BeforeEach
+ public void before() throws Exception {
+ super.before();
+ tableConfig.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ }
+
+ @Test
+ @Override
+ public void testTableScan() throws Exception {
+ writeAndCheckFileMeta(
+ schemas -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ DataTableScan.DataFilePlan plan = table.newScan().plan();
+ checkFilterRowCount(plan, 6L);
+ return plan.splits.stream()
+ .flatMap(s -> s.files().stream())
+ .collect(Collectors.toList());
+ },
+ (files, schemas) -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ DataTableScan.DataFilePlan plan = table.newScan().plan();
+ checkFilterRowCount(plan, 12L);
+ },
+ getPrimaryKeyNames(),
+ tableConfig,
+ this::createFileStoreTable);
+ }
+
+ @Test
+ @Override
+ public void testTableScanFilterExistFields() throws Exception {
+ writeAndCheckFileMeta(
+ schemas -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ // results of field "b" in [14, 19] in SCHEMA_0_FIELDS,
"b" is renamed to "d" in
+ // SCHEMA_1_FIELDS
+ Predicate predicate =
+ new
PredicateBuilder(table.schema().logicalRowType())
+ .between(2, 14, 19);
+ DataTableScan.DataFilePlan plan =
table.newScan().withFilter(predicate).plan();
+ checkFilterRowCount(plan, 6L);
+ return plan.splits.stream()
+ .flatMap(s -> s.files().stream())
+ .collect(Collectors.toList());
+ },
+ (files, schemas) -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ PredicateBuilder builder =
+ new
PredicateBuilder(table.schema().logicalRowType());
+ // results of field "d" in [14, 19] in SCHEMA_1_FIELDS
+ Predicate predicate = builder.between(1, 14, 19);
+ DataTableScan.DataFilePlan plan =
table.newScan().withFilter(predicate).plan();
+ checkFilterRowCount(plan, 12L);
+ },
+ getPrimaryKeyNames(),
+ tableConfig,
+ this::createFileStoreTable);
+ }
+
+ @Test
+ @Override
+ public void testTableScanFilterNewFields() throws Exception {
+ writeAndCheckFileMeta(
+ schemas -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ DataTableScan.DataFilePlan plan = table.newScan().plan();
+ checkFilterRowCount(plan, 6L);
+ return plan.splits.stream()
+ .flatMap(s -> s.files().stream())
+ .collect(Collectors.toList());
+ },
+ (files, schemas) -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ PredicateBuilder builder =
+ new
PredicateBuilder(table.schema().logicalRowType());
+ // results of field "a" in (1120, -] in SCHEMA_1_FIELDS,
"a" is not existed in
+ // SCHEMA_0_FIELDS
+ Predicate predicate = builder.greaterThan(3, 1120);
+ DataTableScan.DataFilePlan plan =
table.newScan().withFilter(predicate).plan();
+ checkFilterRowCount(plan, 12L);
+ },
+ getPrimaryKeyNames(),
+ tableConfig,
+ this::createFileStoreTable);
+ }
+
+ @Override
+ protected FileStoreTable createFileStoreTable(Map<Long, TableSchema>
tableSchemas) {
+ SchemaManager schemaManager = new TestingSchemaManager(tablePath,
tableSchemas);
+ return new ChangelogWithKeyFileStoreTable(
+ tablePath, schemaManager, schemaManager.latest().get());
+ }
+
+ @Override
+ protected BinaryTableStats getTableValueStats(DataFileMeta fileMeta) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileMetaFilterTestBase.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileMetaFilterTestBase.java
new file mode 100644
index 00000000..680ef984
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileMetaFilterTestBase.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
+import org.apache.flink.table.store.file.schema.AtomicDataType;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.SchemaChange;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
+import org.apache.flink.table.store.file.utils.TraceableFileSystem;
+import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.DataSplit;
+import org.apache.flink.table.store.table.source.DataTableScan;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base test class for schema evolution in {@link FileStoreTable}. */
+public abstract class FileMetaFilterTestBase {
+ protected static final List<DataField> SCHEMA_0_FIELDS =
+ Arrays.asList(
+ new DataField(0, "a", new
AtomicDataType(DataTypes.STRING().getLogicalType())),
+ new DataField(1, "pt", new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(2, "b", new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(3, "c", new
AtomicDataType(DataTypes.STRING().getLogicalType())),
+ new DataField(4, "kt", new
AtomicDataType(DataTypes.BIGINT().getLogicalType())),
+ new DataField(5, "d", new
AtomicDataType(DataTypes.STRING().getLogicalType())));
+ protected static final List<DataField> SCHEMA_1_FIELDS =
+ Arrays.asList(
+ new DataField(1, "pt", new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(2, "d", new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(4, "kt", new
AtomicDataType(DataTypes.BIGINT().getLogicalType())),
+ new DataField(6, "a", new
AtomicDataType(DataTypes.INT().getLogicalType())),
+ new DataField(7, "f", new
AtomicDataType(DataTypes.STRING().getLogicalType())),
+ new DataField(8, "b", new
AtomicDataType(DataTypes.STRING().getLogicalType())));
+ protected static final List<String> PARTITION_NAMES =
Collections.singletonList("pt");
+ protected static final List<String> PRIMARY_KEY_NAMES =
Arrays.asList("pt", "kt");
+
+ protected Path tablePath;
+ protected String commitUser;
+ protected final Configuration tableConfig = new Configuration();
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @BeforeEach
+ public void before() throws Exception {
+ tablePath = new Path(TestAtomicRenameFileSystem.SCHEME + "://" +
tempDir.toString());
+ commitUser = UUID.randomUUID().toString();
+ tableConfig.set(CoreOptions.PATH, tablePath.toString());
+ tableConfig.set(CoreOptions.BUCKET, 2);
+ }
+
+ @AfterEach
+ public void after() throws IOException {
+ // assert all connections are closed
+ FileSystem fileSystem = tablePath.getFileSystem();
+ assertThat(fileSystem).isInstanceOf(TraceableFileSystem.class);
+ TraceableFileSystem traceableFileSystem = (TraceableFileSystem)
fileSystem;
+
+ java.util.function.Predicate<Path> pathPredicate =
+ path -> path.toString().contains(tempDir.toString());
+
assertThat(traceableFileSystem.openInputStreams(pathPredicate)).isEmpty();
+
assertThat(traceableFileSystem.openOutputStreams(pathPredicate)).isEmpty();
+ }
+
+ @Test
+ public void testTableScan() throws Exception {
+ writeAndCheckFileMeta(
+ schemas -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ DataTableScan.DataFilePlan plan = table.newScan().plan();
+ checkFilterRowCount(plan, 6L);
+ return plan.splits.stream()
+ .flatMap(s -> s.files().stream())
+ .collect(Collectors.toList());
+ },
+ (files, schemas) -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ DataTableScan.DataFilePlan plan =
+ table.newScan()
+ .withFilter(
+ new
PredicateBuilder(table.schema().logicalRowType())
+ .greaterOrEqual(1, 0))
+ .plan();
+ checkFilterRowCount(plan, 12L);
+
+ List<String> filesName =
+
files.stream().map(DataFileMeta::fileName).collect(Collectors.toList());
+ assertThat(filesName.size()).isGreaterThan(0);
+
+ List<DataFileMeta> fileMetaList =
+ plan.splits.stream()
+ .flatMap(s -> s.files().stream())
+ .collect(Collectors.toList());
+ assertThat(
+ fileMetaList.stream()
+ .map(DataFileMeta::fileName)
+ .collect(Collectors.toList()))
+ .containsAll(filesName);
+
+ for (DataFileMeta fileMeta : fileMetaList) {
+ FieldStats[] statsArray =
getTableValueStats(fileMeta).fields(null);
+ assertThat(statsArray.length).isEqualTo(6);
+ if (filesName.contains(fileMeta.fileName())) {
+ assertThat(statsArray[0].minValue()).isNotNull();
+ assertThat(statsArray[0].maxValue()).isNotNull();
+ assertThat(statsArray[1].minValue()).isNotNull();
+ assertThat(statsArray[1].maxValue()).isNotNull();
+ assertThat(statsArray[2].minValue()).isNotNull();
+ assertThat(statsArray[2].maxValue()).isNotNull();
+
+ assertThat(statsArray[3].minValue()).isNull();
+ assertThat(statsArray[3].maxValue()).isNull();
+ assertThat(statsArray[4].minValue()).isNull();
+ assertThat(statsArray[4].maxValue()).isNull();
+ assertThat(statsArray[5].minValue()).isNull();
+ assertThat(statsArray[5].maxValue()).isNull();
+ } else {
+ assertThat(statsArray[0].minValue()).isNotNull();
+ assertThat(statsArray[0].maxValue()).isNotNull();
+ assertThat(statsArray[1].minValue()).isNotNull();
+ assertThat(statsArray[1].maxValue()).isNotNull();
+ assertThat(statsArray[2].minValue()).isNotNull();
+ assertThat(statsArray[2].maxValue()).isNotNull();
+ assertThat(statsArray[3].minValue()).isNotNull();
+ assertThat(statsArray[3].maxValue()).isNotNull();
+ assertThat(statsArray[4].minValue()).isNotNull();
+ assertThat(statsArray[4].maxValue()).isNotNull();
+ assertThat(statsArray[5].minValue()).isNotNull();
+ assertThat(statsArray[5].maxValue()).isNotNull();
+ }
+ }
+ },
+ getPrimaryKeyNames(),
+ tableConfig,
+ this::createFileStoreTable);
+ }
+
+ @Test
+ public void testTableScanFilterExistFields() throws Exception {
+ writeAndCheckFileMeta(
+ schemas -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ // results of field "b" in [14, 19] in SCHEMA_0_FIELDS,
"b" is renamed to "d" in
+ // SCHEMA_1_FIELDS
+ Predicate predicate =
+ new
PredicateBuilder(table.schema().logicalRowType())
+ .between(2, 14, 19);
+ List<DataFileMeta> files =
+
table.newScan().withFilter(predicate).plan().splits.stream()
+ .flatMap(s -> s.files().stream())
+ .collect(Collectors.toList());
+ assertThat(files.size()).isGreaterThan(0);
+ checkFilterRowCount(files, 3L);
+ return files;
+ },
+ (files, schemas) -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ PredicateBuilder builder =
+ new
PredicateBuilder(table.schema().logicalRowType());
+ // results of field "d" in [14, 19] in SCHEMA_1_FIELDS
+ Predicate predicate = builder.between(1, 14, 19);
+ DataTableScan.DataFilePlan filterFilePlan =
+ table.newScan().withFilter(predicate).plan();
+ List<DataFileMeta> filterFileMetas =
+ filterFilePlan.splits.stream()
+ .flatMap(s -> s.files().stream())
+ .collect(Collectors.toList());
+ checkFilterRowCount(filterFileMetas, 6L);
+
+ List<String> fileNameList =
+ filterFileMetas.stream()
+ .map(DataFileMeta::fileName)
+ .collect(Collectors.toList());
+ Set<String> fileNames =
+ filterFileMetas.stream()
+ .map(DataFileMeta::fileName)
+ .collect(Collectors.toSet());
+
assertThat(fileNameList.size()).isEqualTo(fileNames.size());
+
+ builder = new
PredicateBuilder(table.schema().logicalRowType());
+ // get all meta files with filter
+ DataTableScan.DataFilePlan filterAllFilePlan =
+
table.newScan().withFilter(builder.greaterOrEqual(1, 0)).plan();
+ assertThat(
+ filterAllFilePlan.splits.stream()
+ .flatMap(
+ s ->
+ s.files().stream()
+
.map(DataFileMeta::fileName))
+ .collect(Collectors.toList()))
+ .containsAll(
+ files.stream()
+ .map(DataFileMeta::fileName)
+ .collect(Collectors.toList()));
+
+ // get all meta files without filter
+ DataTableScan.DataFilePlan allFilePlan =
table.newScan().plan();
+
assertThat(filterAllFilePlan.splits).isEqualTo(allFilePlan.splits);
+
+ Set<String> filterFileNames = new HashSet<>();
+ for (DataSplit dataSplit : filterAllFilePlan.splits) {
+ for (DataFileMeta dataFileMeta : dataSplit.files()) {
+ FieldStats[] fieldStats =
getTableValueStats(dataFileMeta).fields(null);
+ int minValue = (Integer) fieldStats[1].minValue();
+ int maxValue = (Integer) fieldStats[1].maxValue();
+ if (minValue >= 14
+ && minValue <= 19
+ && maxValue >= 14
+ && maxValue <= 19) {
+ filterFileNames.add(dataFileMeta.fileName());
+ }
+ }
+ }
+ assertThat(filterFileNames).isEqualTo(fileNames);
+ },
+ getPrimaryKeyNames(),
+ tableConfig,
+ this::createFileStoreTable);
+ }
+
+ @Test
+ public void testTableScanFilterNewFields() throws Exception {
+ writeAndCheckFileMeta(
+ schemas -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ List<DataFileMeta> files =
+ table.newScan().plan().splits.stream()
+ .flatMap(s -> s.files().stream())
+ .collect(Collectors.toList());
+ assertThat(files.size()).isGreaterThan(0);
+ checkFilterRowCount(files, 6L);
+ return files;
+ },
+ (files, schemas) -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ PredicateBuilder builder =
+ new
PredicateBuilder(table.schema().logicalRowType());
+
+ // results of field "a" in (1120, -] in SCHEMA_1_FIELDS,
"a" is not existed in
+ // SCHEMA_0_FIELDS
+ Predicate predicate = builder.greaterThan(3, 1120);
+ DataTableScan.DataFilePlan filterFilePlan =
+ table.newScan().withFilter(predicate).plan();
+ checkFilterRowCount(filterFilePlan, 2L);
+
+ List<DataFileMeta> filterFileMetas =
+ filterFilePlan.splits.stream()
+ .flatMap(s -> s.files().stream())
+ .collect(Collectors.toList());
+ List<String> fileNameList =
+ filterFileMetas.stream()
+ .map(DataFileMeta::fileName)
+ .collect(Collectors.toList());
+ Set<String> fileNames =
+ filterFileMetas.stream()
+ .map(DataFileMeta::fileName)
+ .collect(Collectors.toSet());
+
assertThat(fileNameList.size()).isEqualTo(fileNames.size());
+
+ List<String> filesName =
+
files.stream().map(DataFileMeta::fileName).collect(Collectors.toList());
+
assertThat(fileNameList).doesNotContainAnyElementsOf(filesName);
+
+ DataTableScan.DataFilePlan allFilePlan =
+
table.newScan().withFilter(builder.greaterOrEqual(1, 0)).plan();
+ checkFilterRowCount(allFilePlan, 12L);
+
+ Set<String> filterFileNames = new HashSet<>();
+ for (DataSplit dataSplit : allFilePlan.splits) {
+ for (DataFileMeta dataFileMeta : dataSplit.files()) {
+ FieldStats[] fieldStats =
getTableValueStats(dataFileMeta).fields(null);
+ Integer minValue = (Integer)
fieldStats[3].minValue();
+ Integer maxValue = (Integer)
fieldStats[3].maxValue();
+ if (minValue != null
+ && maxValue != null
+ && minValue > 1120
+ && maxValue > 1120) {
+ filterFileNames.add(dataFileMeta.fileName());
+ }
+ }
+ }
+ assertThat(filterFileNames).isEqualTo(fileNames);
+ },
+ getPrimaryKeyNames(),
+ tableConfig,
+ this::createFileStoreTable);
+ }
+
+ @Test
+ public void testTableScanFilterPartition() throws Exception {
+ writeAndCheckFileMeta(
+ schemas -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ checkFilterRowCount(table, 1, 1, 3L);
+ checkFilterRowCount(table, 1, 2, 3L);
+ return null;
+ },
+ (files, schemas) -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ checkFilterRowCount(table, 0, 1, 7L);
+ checkFilterRowCount(table, 0, 2, 5L);
+ },
+ getPrimaryKeyNames(),
+ tableConfig,
+ this::createFileStoreTable);
+ }
+
+ @Test
+ public void testTableScanFilterPrimaryKey() throws Exception {
+ writeAndCheckFileMeta(
+ schemas -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ PredicateBuilder builder =
+ new
PredicateBuilder(table.schema().logicalRowType());
+ Predicate predicate = builder.between(4, 115L, 120L);
+ DataTableScan.DataFilePlan plan =
table.newScan().withFilter(predicate).plan();
+ checkFilterRowCount(plan, 2L);
+ return null;
+ },
+ (files, schemas) -> {
+ FileStoreTable table = createFileStoreTable(schemas);
+ PredicateBuilder builder =
+ new
PredicateBuilder(table.schema().logicalRowType());
+ Predicate predicate = builder.between(2, 115L, 120L);
+ DataTableScan.DataFilePlan plan =
table.newScan().withFilter(predicate).plan();
+ checkFilterRowCount(plan, 6L);
+ },
+ getPrimaryKeyNames(),
+ tableConfig,
+ this::createFileStoreTable);
+ }
+
+ protected List<String> getPrimaryKeyNames() {
+ return PRIMARY_KEY_NAMES;
+ }
+
+ protected abstract FileStoreTable createFileStoreTable(Map<Long,
TableSchema> tableSchemas);
+
+ protected abstract BinaryTableStats getTableValueStats(DataFileMeta
fileMeta);
+
+ public static <R> void writeAndCheckFileMeta(
+ Function<Map<Long, TableSchema>, R> firstChecker,
+ BiConsumer<R, Map<Long, TableSchema>> secondChecker,
+ List<String> primaryKeyNames,
+ Configuration tableConfig,
+ Function<Map<Long, TableSchema>, FileStoreTable>
createFileStoreTable)
+ throws Exception {
+ Map<Long, TableSchema> tableSchemas = new HashMap<>();
+ tableSchemas.put(
+ 0L,
+ new TableSchema(
+ 0,
+ SCHEMA_0_FIELDS,
+ 5,
+ PARTITION_NAMES,
+ primaryKeyNames,
+ tableConfig.toMap(),
+ ""));
+ FileStoreTable table = createFileStoreTable.apply(tableSchemas);
+ TableWrite write = table.newWrite("user");
+ TableCommit commit = table.newCommit("user");
+
+ write.write(
+ GenericRowData.of(
+ StringData.fromString("S001"),
+ 1,
+ 11,
+ StringData.fromString("S11"),
+ 111L,
+ StringData.fromString("S111")));
+ write.write(
+ GenericRowData.of(
+ StringData.fromString("S002"),
+ 2,
+ 12,
+ StringData.fromString("S12"),
+ 112L,
+ StringData.fromString("S112")));
+ write.write(
+ GenericRowData.of(
+ StringData.fromString("S003"),
+ 1,
+ 13,
+ StringData.fromString("S13"),
+ 113L,
+ StringData.fromString("S113")));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(
+ GenericRowData.of(
+ StringData.fromString("S004"),
+ 1,
+ 14,
+ StringData.fromString("S14"),
+ 114L,
+ StringData.fromString("S114")));
+ write.write(
+ GenericRowData.of(
+ StringData.fromString("S005"),
+ 2,
+ 15,
+ StringData.fromString("S15"),
+ 115L,
+ StringData.fromString("S115")));
+ write.write(
+ GenericRowData.of(
+ StringData.fromString("S006"),
+ 2,
+ 16,
+ StringData.fromString("S16"),
+ 116L,
+ StringData.fromString("S116")));
+ commit.commit(0, write.prepareCommit(true, 0));
+ write.close();
+ R result = firstChecker.apply(tableSchemas);
+
+ tableSchemas.put(
+ 1L,
+ new TableSchema(
+ 1,
+ SCHEMA_1_FIELDS,
+ 8,
+ PARTITION_NAMES,
+ primaryKeyNames,
+ tableConfig.toMap(),
+ ""));
+ table = createFileStoreTable.apply(tableSchemas);
+ write = table.newWrite("user");
+ commit = table.newCommit("user");
+
+ write.write(
+ GenericRowData.of(
+ 1,
+ 17,
+ 117L,
+ 1117,
+ StringData.fromString("S007"),
+ StringData.fromString("S17")));
+ write.write(
+ GenericRowData.of(
+ 2,
+ 18,
+ 118L,
+ 1118,
+ StringData.fromString("S008"),
+ StringData.fromString("S18")));
+ write.write(
+ GenericRowData.of(
+ 1,
+ 19,
+ 119L,
+ 1119,
+ StringData.fromString("S009"),
+ StringData.fromString("S19")));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(
+ GenericRowData.of(
+ 2,
+ 20,
+ 120L,
+ 1120,
+ StringData.fromString("S010"),
+ StringData.fromString("S20")));
+ write.write(
+ GenericRowData.of(
+ 1,
+ 21,
+ 121L,
+ 1121,
+ StringData.fromString("S011"),
+ StringData.fromString("S21")));
+ write.write(
+ GenericRowData.of(
+ 1,
+ 22,
+ 122L,
+ 1122,
+ StringData.fromString("S012"),
+ StringData.fromString("S22")));
+ commit.commit(0, write.prepareCommit(true, 0));
+ write.close();
+
+ secondChecker.accept(result, tableSchemas);
+ }
+
+ protected static void checkFilterRowCount(
+ FileStoreTable table, int index, int value, long expectedRowCount)
{
+ PredicateBuilder builder = new
PredicateBuilder(table.schema().logicalRowType());
+ DataTableScan.DataFilePlan plan =
+ table.newScan().withFilter(builder.equal(index, value)).plan();
+ checkFilterRowCount(plan, expectedRowCount);
+ }
+
+ protected static void checkFilterRowCount(
+ DataTableScan.DataFilePlan plan, long expectedRowCount) {
+ List<DataFileMeta> fileMetaList =
+ plan.splits.stream().flatMap(s ->
s.files().stream()).collect(Collectors.toList());
+ checkFilterRowCount(fileMetaList, expectedRowCount);
+ }
+
+ protected static void checkFilterRowCount(
+ List<DataFileMeta> fileMetaList, long expectedRowCount) {
+
assertThat(fileMetaList.stream().mapToLong(DataFileMeta::rowCount).sum())
+ .isEqualTo(expectedRowCount);
+ }
+
+ /** {@link SchemaManager} subclass for testing. */
+ protected static class TestingSchemaManager extends SchemaManager {
+ private final Map<Long, TableSchema> tableSchemas;
+
+ public TestingSchemaManager(Path tableRoot, Map<Long, TableSchema>
tableSchemas) {
+ super(tableRoot);
+ this.tableSchemas = tableSchemas;
+ }
+
+ @Override
+ public Optional<TableSchema> latest() {
+ return Optional.of(
+ tableSchemas.get(
+ tableSchemas.keySet().stream()
+ .max(Long::compareTo)
+ .orElseThrow(IllegalStateException::new)));
+ }
+
+ @Override
+ public List<TableSchema> listAll() {
+ return new ArrayList<>(tableSchemas.values());
+ }
+
+ @Override
+ public List<Long> listAllIds() {
+ return new ArrayList<>(tableSchemas.keySet());
+ }
+
+ @Override
+ public TableSchema commitNewVersion(UpdateSchema updateSchema) throws
Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TableSchema commitChanges(List<SchemaChange> changes) throws
Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TableSchema schema(long id) {
+ return checkNotNull(tableSchemas.get(id));
+ }
+ }
+}