This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 19f2973d [FLINK-28159] Table Store: Bucket pruning based on bucket key
filter
19f2973d is described below
commit 19f2973d6bc59b0d0310c48eae8bb97741fb31d9
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jul 6 17:54:57 2022 +0800
[FLINK-28159] Table Store: Bucket pruning based on bucket key filter
This closes #199
---
.../table/store/connector/PredicateITCase.java | 82 +++++++++++
.../table/store/file/AppendOnlyFileStore.java | 4 +
.../flink/table/store/file/KeyValueFileStore.java | 4 +
.../file/operation/AbstractFileStoreScan.java | 19 ++-
.../file/operation/AppendOnlyFileStoreScan.java | 19 +++
.../file/operation/KeyValueFileStoreScan.java | 19 +++
.../flink/table/store/file/predicate/And.java | 2 +-
.../table/store/file/predicate/BucketSelector.java | 160 +++++++++++++++++++++
.../store/file/predicate/CompoundPredicate.java | 22 ++-
.../flink/table/store/file/predicate/In.java | 2 +-
.../table/store/file/predicate/LeafFunction.java | 22 ++-
.../store/file/predicate/LeafUnaryFunction.java | 2 +-
.../flink/table/store/file/predicate/NotIn.java | 2 +-
.../predicate/NullFalseLeafBinaryFunction.java | 2 +-
.../flink/table/store/file/predicate/Or.java | 2 +-
.../store/file/predicate/PredicateBuilder.java | 62 +++++++-
.../flink/table/store/file/schema/TableSchema.java | 20 ++-
.../store/table/AppendOnlyFileStoreTable.java | 1 +
.../table/ChangelogValueCountFileStoreTable.java | 1 +
.../table/ChangelogWithKeyFileStoreTable.java | 52 ++++---
.../store/table/sink/SinkRecordConverter.java | 2 +-
.../flink/table/store/table/source/TableScan.java | 35 +----
.../flink/table/store/file/TestFileStore.java | 1 +
.../store/file/predicate/BucketSelectorTest.java | 147 +++++++++++++++++++
.../store/table/AppendOnlyFileStoreTableTest.java | 5 +-
.../ChangelogValueCountFileStoreTableTest.java | 5 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 6 +-
.../table/store/table/FileStoreTableTestBase.java | 39 ++++-
.../table/store/table/WritePreemptMemoryTest.java | 5 +-
.../store/table/sink/SinkRecordConverterTest.java | 11 +-
30 files changed, 665 insertions(+), 90 deletions(-)
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
new file mode 100644
index 00000000..4e56fec8
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Predicate ITCase. */
+public class PredicateITCase extends AbstractTestBase {
+
+ private TableEnvironment tEnv;
+
+ @Before
+ public void before() throws IOException {
+ tEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG TABLE_STORE WITH ("
+ + "'type'='table-store', 'warehouse'='%s')",
+ TEMPORARY_FOLDER.newFolder().toURI()));
+ tEnv.useCatalog("TABLE_STORE");
+ }
+
+ @Test
+ public void testPkFilterBucket() throws Exception {
+ sql("CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b INT) WITH
('bucket' = '5')");
+ innerTest();
+ }
+
+ @Test
+ public void testNoPkFilterBucket() throws Exception {
+ sql("CREATE TABLE T (a INT, b INT) WITH ('bucket' = '5',
'bucket-key'='a')");
+ innerTest();
+ }
+
+ @Test
+ public void testAppendFilterBucket() throws Exception {
+ sql(
+ "CREATE TABLE T (a INT, b INT) WITH ('bucket' = '5',
'bucket-key'='a', 'write-mode'='append-only')");
+ innerTest();
+ }
+
+ private void innerTest() throws Exception {
+ sql("INSERT INTO T VALUES (1, 2), (3, 4), (5, 6), (7, 8), (9, 10)");
+ assertThat(sql("SELECT * FROM T WHERE a =
5")).containsExactlyInAnyOrder(Row.of(5, 6));
+ }
+
+ private List<Row> sql(String query, Object... args) throws Exception {
+ try (CloseableIterator<Row> iter =
tEnv.executeSql(String.format(query, args)).collect()) {
+ return ImmutableList.copyOf(iter);
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index 40d4203e..8a803a74 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.RowType;
/** {@link FileStore} for reading and writing {@link RowData}. */
public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
+ private final RowType bucketKeyType;
private final RowType rowType;
public AppendOnlyFileStore(
@@ -36,8 +37,10 @@ public class AppendOnlyFileStore extends
AbstractFileStore<RowData> {
long schemaId,
CoreOptions options,
RowType partitionType,
+ RowType bucketKeyType,
RowType rowType) {
super(schemaManager, schemaId, options, partitionType);
+ this.bucketKeyType = bucketKeyType;
this.rowType = rowType;
}
@@ -67,6 +70,7 @@ public class AppendOnlyFileStore extends
AbstractFileStore<RowData> {
private AppendOnlyFileStoreScan newScan(boolean checkNumOfBuckets) {
return new AppendOnlyFileStoreScan(
partitionType,
+ bucketKeyType.getFieldCount() == 0 ? rowType : bucketKeyType,
rowType,
snapshotManager(),
manifestFileFactory(),
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 58b43685..9c7ae86e 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -36,6 +36,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
private static final long serialVersionUID = 1L;
+ private final RowType bucketKeyType;
private final RowType keyType;
private final RowType valueType;
private final Supplier<Comparator<RowData>> keyComparatorSupplier;
@@ -46,10 +47,12 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
long schemaId,
CoreOptions options,
RowType partitionType,
+ RowType bucketKeyType,
RowType keyType,
RowType valueType,
MergeFunction mergeFunction) {
super(schemaManager, schemaId, options, partitionType);
+ this.bucketKeyType = bucketKeyType;
this.keyType = keyType;
this.valueType = valueType;
this.mergeFunction = mergeFunction;
@@ -93,6 +96,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
private KeyValueFileStoreScan newScan(boolean checkNumOfBuckets) {
return new KeyValueFileStoreScan(
partitionType,
+ bucketKeyType,
keyType,
snapshotManager(),
manifestFileFactory(),
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 345ed23c..6494c31a 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -25,6 +25,7 @@ import
org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.predicate.BucketSelector;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
@@ -50,6 +51,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private final FieldStatsArraySerializer partitionStatsConverter;
private final RowDataToObjectArrayConverter partitionConverter;
+ protected final RowType bucketKeyType;
private final SnapshotManager snapshotManager;
private final ManifestFile.Factory manifestFileFactory;
private final ManifestList manifestList;
@@ -57,6 +59,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private final boolean checkNumOfBuckets;
private Predicate partitionFilter;
+ private BucketSelector bucketSelector;
private Long specifiedSnapshotId = null;
private Integer specifiedBucket = null;
@@ -65,6 +68,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
public AbstractFileStoreScan(
RowType partitionType,
+ RowType bucketKeyType,
SnapshotManager snapshotManager,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
@@ -72,6 +76,9 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
boolean checkNumOfBuckets) {
this.partitionStatsConverter = new
FieldStatsArraySerializer(partitionType);
this.partitionConverter = new
RowDataToObjectArrayConverter(partitionType);
+ Preconditions.checkArgument(
+ bucketKeyType.getFieldCount() > 0, "The bucket keys should not
be empty.");
+ this.bucketKeyType = bucketKeyType;
this.snapshotManager = snapshotManager;
this.manifestFileFactory = manifestFileFactory;
this.manifestList = manifestListFactory.create();
@@ -85,6 +92,11 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return this;
}
+ protected FileStoreScan withBucketKeyFilter(Predicate predicate) {
+ this.bucketSelector = BucketSelector.create(predicate,
bucketKeyType).orElse(null);
+ return this;
+ }
+
@Override
public FileStoreScan withPartitionFilter(List<BinaryRowData> partitions) {
PredicateBuilder builder = new
PredicateBuilder(partitionConverter.rowType());
@@ -228,7 +240,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
// however entry.bucket() was computed against the old numOfBuckets
// and thus the filtered manifest entries might be empty
// which renders the bucket check invalid
- if (filterByBucket(file)) {
+ if (filterByBucket(file) && filterByBucketSelector(file)) {
files.add(file);
}
}
@@ -267,6 +279,11 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return (specifiedBucket == null || entry.bucket() == specifiedBucket);
}
+ private boolean filterByBucketSelector(ManifestEntry entry) {
+ return (bucketSelector == null
+ || bucketSelector.select(entry.bucket(),
entry.totalBuckets()));
+ }
+
protected abstract boolean filterByStats(ManifestEntry entry);
private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta
manifest) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
index 24ca55d8..746bf16f 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
@@ -26,15 +26,23 @@ import
org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.types.logical.RowType;
+import java.util.List;
+
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
+import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.pickTransformFieldMapping;
+import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
+
/** {@link FileStoreScan} for {@link
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
private final FieldStatsArraySerializer rowStatsConverter;
+ private final RowType rowType;
private Predicate filter;
public AppendOnlyFileStoreScan(
RowType partitionType,
+ RowType bucketKeyType,
RowType rowType,
SnapshotManager snapshotManager,
ManifestFile.Factory manifestFileFactory,
@@ -43,16 +51,27 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
boolean checkNumOfBuckets) {
super(
partitionType,
+ bucketKeyType,
snapshotManager,
manifestFileFactory,
manifestListFactory,
numOfBuckets,
checkNumOfBuckets);
this.rowStatsConverter = new FieldStatsArraySerializer(rowType);
+ this.rowType = rowType;
}
public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
this.filter = predicate;
+
+ List<Predicate> bucketFilters =
+ pickTransformFieldMapping(
+ splitAnd(predicate),
+ rowType.getFieldNames(),
+ bucketKeyType.getFieldNames());
+ if (bucketFilters.size() > 0) {
+ withBucketKeyFilter(and(bucketFilters));
+ }
return this;
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
index aa6bc701..0ac3e28f 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
@@ -26,15 +26,23 @@ import
org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.types.logical.RowType;
+import java.util.List;
+
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
+import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.pickTransformFieldMapping;
+import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
+
/** {@link FileStoreScan} for {@link
org.apache.flink.table.store.file.KeyValueFileStore}. */
public class KeyValueFileStoreScan extends AbstractFileStoreScan {
private final FieldStatsArraySerializer keyStatsConverter;
+ private final RowType keyType;
private Predicate keyFilter;
public KeyValueFileStoreScan(
RowType partitionType,
+ RowType bucketKeyType,
RowType keyType,
SnapshotManager snapshotManager,
ManifestFile.Factory manifestFileFactory,
@@ -43,16 +51,27 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
boolean checkNumOfBuckets) {
super(
partitionType,
+ bucketKeyType,
snapshotManager,
manifestFileFactory,
manifestListFactory,
numOfBuckets,
checkNumOfBuckets);
this.keyStatsConverter = new FieldStatsArraySerializer(keyType);
+ this.keyType = keyType;
}
public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
this.keyFilter = predicate;
+
+ List<Predicate> bucketFilters =
+ pickTransformFieldMapping(
+ splitAnd(predicate),
+ keyType.getFieldNames(),
+ bucketKeyType.getFieldNames());
+ if (bucketFilters.size() > 0) {
+ withBucketKeyFilter(and(bucketFilters));
+ }
return this;
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
index dc256767..339b459e 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Optional;
/** A {@link CompoundPredicate.Function} to eval and. */
-public class And implements CompoundPredicate.Function {
+public class And extends CompoundPredicate.Function {
private static final long serialVersionUID = 1L;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java
new file mode 100644
index 00000000..43753a26
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
+import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.splitOr;
+
+/** Selector to select bucket from {@link Predicate}. */
+@ThreadSafe
+public class BucketSelector implements Serializable {
+
+ public static final int MAX_VALUES = 1000;
+
+ private static final long serialVersionUID = 1L;
+
+ private final int[] hashCodes;
+
+ private final Map<Integer, Set<Integer>> buckets = new
ConcurrentHashMap<>();
+
+ public BucketSelector(int[] hashCodes) {
+ this.hashCodes = hashCodes;
+ }
+
+ public boolean select(int bucket, int numBucket) {
+ return buckets.computeIfAbsent(
+ numBucket,
+ k -> {
+ ImmutableSet.Builder<Integer> builder = new
ImmutableSet.Builder<>();
+ for (int hash : hashCodes) {
+ builder.add(hash % numBucket);
+ }
+ return builder.build();
+ })
+ .contains(bucket);
+ }
+
+ @VisibleForTesting
+ int[] hashCodes() {
+ return hashCodes;
+ }
+
+ public static Optional<BucketSelector> create(
+ Predicate bucketPredicate, RowType bucketKeyType) {
+ @SuppressWarnings("unchecked")
+ List<Object>[] bucketValues = new List[bucketKeyType.getFieldCount()];
+
+ nextAnd:
+ for (Predicate andPredicate : splitAnd(bucketPredicate)) {
+ Integer reference = null;
+ List<Object> values = new ArrayList<>();
+ for (Predicate orPredicate : splitOr(andPredicate)) {
+ if (orPredicate instanceof LeafPredicate) {
+ LeafPredicate leaf = (LeafPredicate) orPredicate;
+ if (reference == null || reference == leaf.index()) {
+ reference = leaf.index();
+ if (leaf.function().equals(Equal.INSTANCE)
+ || leaf.function().equals(In.INSTANCE)) {
+ values.addAll(
+ leaf.literals().stream()
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList()));
+ continue;
+ }
+ }
+ }
+
+ // failed, go to next predicate
+ continue nextAnd;
+ }
+ if (reference != null) {
+ if (bucketValues[reference] != null) {
+ // Repeated equals in And?
+ return Optional.empty();
+ }
+
+ bucketValues[reference] = values;
+ }
+ }
+
+ int rowCount = 1;
+ for (List<Object> values : bucketValues) {
+ if (values == null) {
+ return Optional.empty();
+ }
+
+ rowCount *= values.size();
+ if (rowCount > MAX_VALUES) {
+ return Optional.empty();
+ }
+ }
+
+ RowDataSerializer serializer = new RowDataSerializer(bucketKeyType);
+ List<Integer> hashCodes = new ArrayList<>();
+ assembleRows(
+ bucketValues,
+ columns -> hashCodes.add(hash(columns, serializer)),
+ new ArrayList<>(),
+ 0);
+
+ return Optional.of(new BucketSelector(hashCodes.stream().mapToInt(i ->
i).toArray()));
+ }
+
+ private static int hash(List<Object> columns, RowDataSerializer
serializer) {
+ return
serializer.toBinaryRow(GenericRowData.of(columns.toArray())).hashCode();
+ }
+
+ private static void assembleRows(
+ List<Object>[] rowValues,
+ Consumer<List<Object>> consumer,
+ List<Object> stack,
+ int columnIndex) {
+ List<Object> columnValues = rowValues[columnIndex];
+ for (Object value : columnValues) {
+ stack.add(value);
+ if (columnIndex == rowValues.length - 1) {
+ // last column, consume row
+ consumer.accept(stack);
+ } else {
+ assembleRows(rowValues, consumer, stack, columnIndex + 1);
+ }
+ stack.remove(stack.size() - 1);
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
index d823da0d..042b269b 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
@@ -72,12 +72,26 @@ public class CompoundPredicate implements Predicate {
}
/** Evaluate the predicate result based on multiple {@link Predicate}s. */
- public interface Function extends Serializable {
+ public abstract static class Function implements Serializable {
- boolean test(Object[] values, List<Predicate> children);
+ public abstract boolean test(Object[] values, List<Predicate>
children);
- boolean test(long rowCount, FieldStats[] fieldStats, List<Predicate>
children);
+ public abstract boolean test(
+ long rowCount, FieldStats[] fieldStats, List<Predicate>
children);
- Optional<Predicate> negate(List<Predicate> children);
+ public abstract Optional<Predicate> negate(List<Predicate> children);
+
+ @Override
+ public int hashCode() {
+ return this.getClass().getName().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ return o != null && getClass() == o.getClass();
+ }
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
index aa338945..a43e5598 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
@@ -27,7 +27,7 @@ import java.util.Optional;
import static
org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
/** A {@link LeafFunction} to eval in. */
-public class In implements LeafFunction {
+public class In extends LeafFunction {
private static final long serialVersionUID = 1L;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
index 00c852cc..62ef23bb 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
@@ -26,11 +26,25 @@ import java.util.List;
import java.util.Optional;
/** Function to test a field with literals. */
-public interface LeafFunction extends Serializable {
+public abstract class LeafFunction implements Serializable {
- boolean test(LogicalType type, Object field, List<Object> literals);
+ public abstract boolean test(LogicalType type, Object field, List<Object>
literals);
- boolean test(LogicalType type, long rowCount, FieldStats fieldStats,
List<Object> literals);
+ public abstract boolean test(
+ LogicalType type, long rowCount, FieldStats fieldStats,
List<Object> literals);
- Optional<LeafFunction> negate();
+ public abstract Optional<LeafFunction> negate();
+
+ @Override
+ public int hashCode() {
+ return this.getClass().getName().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ return o != null && getClass() == o.getClass();
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
index 72dcfbb1..034c55c8 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
@@ -24,7 +24,7 @@ import org.apache.flink.table.types.logical.LogicalType;
import java.util.List;
/** Function to test a field. */
-public abstract class LeafUnaryFunction implements LeafFunction {
+public abstract class LeafUnaryFunction extends LeafFunction {
private static final long serialVersionUID = 1L;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
index d710da0f..f93bbf70 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
@@ -27,7 +27,7 @@ import java.util.Optional;
import static
org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
/** A {@link LeafFunction} to eval not in. */
-public class NotIn implements LeafFunction {
+public class NotIn extends LeafFunction {
private static final long serialVersionUID = 1L;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
index dec41e26..e184d931 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
@@ -24,7 +24,7 @@ import org.apache.flink.table.types.logical.LogicalType;
import java.util.List;
/** Function to test a field with a literal. */
-public abstract class NullFalseLeafBinaryFunction implements LeafFunction {
+public abstract class NullFalseLeafBinaryFunction extends LeafFunction {
private static final long serialVersionUID = 1L;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
index 9ba91286..802000a7 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Optional;
/** A {@link CompoundPredicate.Function} to eval or. */
-public class Or implements CompoundPredicate.Function {
+public class Or extends CompoundPredicate.Function {
private static final long serialVersionUID = 1L;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
index 93921a50..035809d9 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import static java.util.Collections.singletonList;
@@ -141,15 +142,22 @@ public class PredicateBuilder {
public static List<Predicate> splitAnd(Predicate predicate) {
List<Predicate> result = new ArrayList<>();
- splitAnd(predicate, result);
+ splitCompound(And.INSTANCE, predicate, result);
return result;
}
- private static void splitAnd(Predicate predicate, List<Predicate> result) {
+ public static List<Predicate> splitOr(Predicate predicate) {
+ List<Predicate> result = new ArrayList<>();
+ splitCompound(Or.INSTANCE, predicate, result);
+ return result;
+ }
+
+ private static void splitCompound(
+ CompoundPredicate.Function function, Predicate predicate,
List<Predicate> result) {
if (predicate instanceof CompoundPredicate
- && ((CompoundPredicate)
predicate).function().equals(And.INSTANCE)) {
+ && ((CompoundPredicate)
predicate).function().equals(function)) {
for (Predicate child : ((CompoundPredicate) predicate).children())
{
- splitAnd(child, result);
+ splitCompound(function, child, result);
}
} else {
result.add(predicate);
@@ -220,4 +228,50 @@ public class PredicateBuilder {
"Unsupported predicate leaf type " +
literalType.getTypeRoot().name());
}
}
+
+ public static List<Predicate> pickTransformFieldMapping(
+ List<Predicate> predicates, List<String> inputFields, List<String>
pickedFields) {
+ return pickTransformFieldMapping(
+ predicates,
inputFields.stream().mapToInt(pickedFields::indexOf).toArray());
+ }
+
+ public static List<Predicate> pickTransformFieldMapping(
+ List<Predicate> predicates, int[] fieldIdxMapping) {
+ List<Predicate> pick = new ArrayList<>();
+ for (Predicate p : predicates) {
+ Optional<Predicate> mapped = transformFieldMapping(p,
fieldIdxMapping);
+ mapped.ifPresent(pick::add);
+ }
+ return pick;
+ }
+
+ public static Optional<Predicate> transformFieldMapping(
+ Predicate predicate, int[] fieldIdxMapping) {
+ if (predicate instanceof CompoundPredicate) {
+ CompoundPredicate compoundPredicate = (CompoundPredicate)
predicate;
+ List<Predicate> children = new ArrayList<>();
+ for (Predicate child : compoundPredicate.children()) {
+ Optional<Predicate> mapped = transformFieldMapping(child,
fieldIdxMapping);
+ if (mapped.isPresent()) {
+ children.add(mapped.get());
+ } else {
+ return Optional.empty();
+ }
+ }
+ return Optional.of(new
CompoundPredicate(compoundPredicate.function(), children));
+ } else {
+ LeafPredicate leafPredicate = (LeafPredicate) predicate;
+ int mapped = fieldIdxMapping[leafPredicate.index()];
+ if (mapped >= 0) {
+ return Optional.of(
+ new LeafPredicate(
+ leafPredicate.function(),
+ leafPredicate.type(),
+ mapped,
+ leafPredicate.literals()));
+ } else {
+ return Optional.empty();
+ }
+ }
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
index 2a6c6b34..bf7d0068 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
@@ -134,7 +134,8 @@ public class TableSchema implements Serializable {
return options;
}
- public List<String> bucketKeys() {
+ /** Original bucket keys, maybe empty. */
+ public List<String> originalBucketKeys() {
String key = options.get(BUCKET_KEY.key());
if (StringUtils.isNullOrWhitespaceOnly(key)) {
return Collections.emptyList();
@@ -146,6 +147,12 @@ public class TableSchema implements Serializable {
"Field names %s should contains all bucket keys
%s.",
fieldNames(), bucketKeys));
}
+ if (bucketKeys.stream().anyMatch(partitionKeys::contains)) {
+ throw new RuntimeException(
+ String.format(
+ "Bucket keys %s should not in partition keys %s.",
+ bucketKeys, partitionKeys));
+ }
if (primaryKeys.size() > 0) {
if (!containsAll(primaryKeys, bucketKeys)) {
throw new RuntimeException(
@@ -173,6 +180,17 @@ public class TableSchema implements Serializable {
return projectedLogicalRowType(partitionKeys);
}
+ public RowType logicalBucketKeyType() {
+ List<String> bucketKeys = originalBucketKeys();
+ if (bucketKeys.isEmpty()) {
+ bucketKeys = trimmedPrimaryKeys();
+ }
+ if (bucketKeys.isEmpty()) {
+ bucketKeys = fieldNames();
+ }
+ return projectedLogicalRowType(bucketKeys);
+ }
+
public RowType logicalTrimmedPrimaryKeysType() {
return projectedLogicalRowType(trimmedPrimaryKeys());
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 4d343810..16f4be28 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -60,6 +60,7 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
tableSchema.id(),
new CoreOptions(tableSchema.options()),
tableSchema.logicalPartitionType(),
+ tableSchema.logicalBucketKeyType(),
tableSchema.logicalRowType());
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 855602f5..ad90ede0 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -69,6 +69,7 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
tableSchema.id(),
new CoreOptions(tableSchema.options()),
tableSchema.logicalPartitionType(),
+ tableSchema.logicalBucketKeyType(),
tableSchema.logicalRowType(),
countType,
mergeFunction);
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 424a1dfb..37a29062 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -50,14 +50,17 @@ import
org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterat
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
import java.util.stream.Collectors;
+import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.pickTransformFieldMapping;
+import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
+
/** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode with
primary keys. */
public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
+ private static final String KEY_FIELD_PREFIX = "_KEY_";
+
private static final long serialVersionUID = 1L;
private final KeyValueFileStore store;
@@ -66,21 +69,7 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
Path path, SchemaManager schemaManager, TableSchema tableSchema) {
super(path, tableSchema);
RowType rowType = tableSchema.logicalRowType();
-
- // add _KEY_ prefix to avoid conflict with value
- RowType keyType =
- new RowType(
-
tableSchema.logicalTrimmedPrimaryKeysType().getFields().stream()
- .map(
- f ->
- new RowType.RowField(
- "_KEY_" + f.getName(),
- f.getType(),
-
f.getDescription().orElse(null)))
- .collect(Collectors.toList()));
-
Configuration conf = Configuration.fromMap(tableSchema.options());
-
CoreOptions.MergeEngine mergeEngine =
conf.get(CoreOptions.MERGE_ENGINE);
MergeFunction mergeFunction;
switch (mergeEngine) {
@@ -105,11 +94,25 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
tableSchema.id(),
new CoreOptions(conf),
tableSchema.logicalPartitionType(),
- keyType,
+ addKeyNamePrefix(tableSchema.logicalBucketKeyType()),
+
addKeyNamePrefix(tableSchema.logicalTrimmedPrimaryKeysType()),
rowType,
mergeFunction);
}
+ private RowType addKeyNamePrefix(RowType type) {
+ // add prefix to avoid conflict with value
+ return new RowType(
+ type.getFields().stream()
+ .map(
+ f ->
+ new RowType.RowField(
+ KEY_FIELD_PREFIX + f.getName(),
+ f.getType(),
+
f.getDescription().orElse(null)))
+ .collect(Collectors.toList()));
+ }
+
@Override
public TableScan newScan() {
KeyValueFileStoreScan scan = store.newScan();
@@ -133,16 +136,11 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
// file 2 will be ignored, and the final result will be key =
a, value = 1 while the
// correct result is an empty set
// TODO support value filter
- List<String> trimmedPrimaryKeys =
tableSchema.trimmedPrimaryKeys();
- int[] fieldIdxToKeyIdx =
- tableSchema.fields().stream()
- .mapToInt(f ->
trimmedPrimaryKeys.indexOf(f.name()))
- .toArray();
- List<Predicate> keyFilters = new ArrayList<>();
- for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
- Optional<Predicate> mapped = mapFilterFields(p,
fieldIdxToKeyIdx);
- mapped.ifPresent(keyFilters::add);
- }
+ List<Predicate> keyFilters =
+ pickTransformFieldMapping(
+ splitAnd(predicate),
+ tableSchema.fieldNames(),
+ tableSchema.trimmedPrimaryKeys());
if (keyFilters.size() > 0) {
scan.withKeyFilter(PredicateBuilder.and(keyFilters));
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
index 9d926ba5..394964ad 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
@@ -51,7 +51,7 @@ public class SinkRecordConverter {
numBucket,
tableSchema.logicalRowType(),
tableSchema.projection(tableSchema.partitionKeys()),
- tableSchema.projection(tableSchema.bucketKeys()),
+ tableSchema.projection(tableSchema.originalBucketKeys()),
tableSchema.projection(tableSchema.trimmedPrimaryKeys()),
tableSchema.projection(tableSchema.primaryKeys()));
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
index f69ae473..5a9fc05b 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -22,8 +22,6 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.operation.FileStoreScan;
-import org.apache.flink.table.store.file.predicate.CompoundPredicate;
-import org.apache.flink.table.store.file.predicate.LeafPredicate;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.TableSchema;
@@ -36,6 +34,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
+import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.transformFieldMapping;
+
/** An abstraction layer above {@link FileStoreScan} to provide input split
generation. */
public abstract class TableScan {
@@ -74,7 +74,7 @@ public abstract class TableScan {
List<Predicate> partitionFilters = new ArrayList<>();
List<Predicate> nonPartitionFilters = new ArrayList<>();
for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
- Optional<Predicate> mapped = mapFilterFields(p,
fieldIdxToPartitionIdx);
+ Optional<Predicate> mapped = transformFieldMapping(p,
fieldIdxToPartitionIdx);
if (mapped.isPresent()) {
partitionFilters.add(mapped.get());
} else {
@@ -136,35 +136,6 @@ public abstract class TableScan {
protected abstract void withNonPartitionFilter(Predicate predicate);
- protected Optional<Predicate> mapFilterFields(Predicate predicate, int[]
fieldIdxMapping) {
- if (predicate instanceof CompoundPredicate) {
- CompoundPredicate compoundPredicate = (CompoundPredicate)
predicate;
- List<Predicate> children = new ArrayList<>();
- for (Predicate child : compoundPredicate.children()) {
- Optional<Predicate> mapped = mapFilterFields(child,
fieldIdxMapping);
- if (mapped.isPresent()) {
- children.add(mapped.get());
- } else {
- return Optional.empty();
- }
- }
- return Optional.of(new
CompoundPredicate(compoundPredicate.function(), children));
- } else {
- LeafPredicate leafPredicate = (LeafPredicate) predicate;
- int mapped = fieldIdxMapping[leafPredicate.index()];
- if (mapped >= 0) {
- return Optional.of(
- new LeafPredicate(
- leafPredicate.function(),
- leafPredicate.type(),
- mapped,
- leafPredicate.literals()));
- } else {
- return Optional.empty();
- }
- }
- }
-
/** Scanning plan containing snapshot ID and input splits. */
public static class Plan {
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 21ed64a7..3a6e39ca 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -124,6 +124,7 @@ public class TestFileStore extends KeyValueFileStore {
options,
partitionType,
keyType,
+ keyType,
valueType,
mergeFunction);
this.root = root;
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/BucketSelectorTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/BucketSelectorTest.java
new file mode 100644
index 00000000..aea241b5
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/BucketSelectorTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.or;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BucketSelector}. */
+public class BucketSelectorTest {
+
+ private final RowType rowType = RowType.of(new IntType(), new IntType(),
new IntType());
+
+ private final PredicateBuilder builder = new PredicateBuilder(rowType);
+
+ @Test
+ public void testRepeatEqual() {
+ assertThat(newSelector(and(builder.equal(0, 0), builder.equal(0,
1)))).isEmpty();
+ }
+
+ @Test
+ public void testNotFull() {
+ assertThat(newSelector(and(builder.equal(0, 0)))).isEmpty();
+ }
+
+ @Test
+ public void testOtherPredicate() {
+ assertThat(newSelector(and(builder.notEqual(0, 0)))).isEmpty();
+ }
+
+ @Test
+ public void testOrIllegal() {
+ assertThat(
+ newSelector(
+ and(
+ or(builder.equal(0, 5),
builder.equal(1, 6)),
+ builder.equal(1, 1),
+ builder.equal(2, 2))))
+ .isEmpty();
+ }
+
+ @Test
+ public void testNormal() {
+ BucketSelector selector =
+ newSelector(and(builder.equal(0, 0), builder.equal(1, 1),
builder.equal(2, 2)))
+ .get();
+ assertThat(selector.hashCodes()).containsExactly(1141287431);
+ }
+
+ @Test
+ public void testIn() {
+ BucketSelector selector =
+ newSelector(
+ and(
+ builder.in(0, Arrays.asList(5, 6, 7)),
+ builder.equal(1, 1),
+ builder.equal(2, 2)))
+ .get();
+ assertThat(selector.hashCodes()).containsExactly(-1272936927,
582056914, -1234868890);
+ }
+
+ @Test
+ public void testOr() {
+ BucketSelector selector =
+ newSelector(
+ and(
+ or(
+ builder.equal(0, 5),
+ builder.equal(0, 6),
+ builder.equal(0, 7)),
+ builder.equal(1, 1),
+ builder.equal(2, 2)))
+ .get();
+ assertThat(selector.hashCodes()).containsExactly(-1272936927,
582056914, -1234868890);
+ }
+
+ @Test
+ public void testInNull() {
+ BucketSelector selector =
+ newSelector(
+ and(
+ builder.in(0, Arrays.asList(5, 6, 7,
null)),
+ builder.equal(1, 1),
+ builder.equal(2, 2)))
+ .get();
+ assertThat(selector.hashCodes()).containsExactly(-1272936927,
582056914, -1234868890);
+ }
+
+ @Test
+ public void testMultipleIn() {
+ BucketSelector selector =
+ newSelector(
+ and(
+ builder.in(0, Arrays.asList(5, 6, 7)),
+ builder.in(1, Arrays.asList(1, 8)),
+ builder.equal(2, 2)))
+ .get();
+ assertThat(selector.hashCodes())
+ .containsExactly(
+ -1272936927, -1567268077, 582056914, 2124429589,
-1234868890, 1063796399);
+ }
+
+ @Test
+ public void testMultipleOr() {
+ BucketSelector selector =
+ newSelector(
+ and(
+ or(
+ builder.equal(0, 5),
+ builder.equal(0, 6),
+ builder.equal(0, 7)),
+ or(builder.equal(1, 1),
builder.equal(1, 8)),
+ builder.equal(2, 2)))
+ .get();
+ assertThat(selector.hashCodes())
+ .containsExactly(
+ -1272936927, -1567268077, 582056914, 2124429589,
-1234868890, 1063796399);
+ }
+
+ private Optional<BucketSelector> newSelector(Predicate predicate) {
+ return BucketSelector.create(predicate, rowType);
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index b6bfd6bf..78cd1c8f 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -39,6 +39,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
@@ -162,12 +163,14 @@ public class AppendOnlyFileStoreTableTest extends
FileStoreTableTestBase {
}
@Override
- protected FileStoreTable createFileStoreTable() throws Exception {
+ protected FileStoreTable createFileStoreTable(Consumer<Configuration>
configure)
+ throws Exception {
Path tablePath = new Path(tempDir.toString());
Configuration conf = new Configuration();
conf.set(CoreOptions.PATH, tablePath.toString());
conf.set(CoreOptions.FILE_FORMAT, "avro");
conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+ configure.accept(conf);
SchemaManager schemaManager = new SchemaManager(tablePath);
TableSchema tableSchema =
schemaManager.commitNewVersion(
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index e5169555..e39ac3dd 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
@@ -165,12 +166,14 @@ public class ChangelogValueCountFileStoreTableTest
extends FileStoreTableTestBas
}
@Override
- protected FileStoreTable createFileStoreTable() throws Exception {
+ protected FileStoreTable createFileStoreTable(Consumer<Configuration>
configure)
+ throws Exception {
Path tablePath = new Path(tempDir.toString());
Configuration conf = new Configuration();
conf.set(CoreOptions.PATH, tablePath.toString());
conf.set(CoreOptions.FILE_FORMAT, "avro");
conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ configure.accept(conf);
SchemaManager schemaManager = new SchemaManager(tablePath);
TableSchema tableSchema =
schemaManager.commitNewVersion(
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 4c3b999d..0f3c31a1 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -209,15 +209,11 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
write.close();
}
- @Override
- protected FileStoreTable createFileStoreTable() throws Exception {
- return createFileStoreTable(false);
- }
-
protected FileStoreTable createFileStoreTable(boolean changelogFile)
throws Exception {
return createFileStoreTable(conf ->
conf.set(CoreOptions.CHANGELOG_FILE, changelogFile));
}
+ @Override
protected FileStoreTable createFileStoreTable(Consumer<Configuration>
configure)
throws Exception {
Path tablePath = new Path(tempDir.toString());
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index f41de18b..86e28662 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.table;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@@ -25,6 +26,7 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import
org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.ReaderSupplier;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.table.sink.TableCommit;
@@ -42,8 +44,11 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
import java.util.function.Function;
+import static org.apache.flink.table.store.CoreOptions.BUCKET;
+import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
import static org.assertj.core.api.Assertions.assertThat;
/** Base test class for {@link FileStoreTable}. */
@@ -101,6 +106,33 @@ public abstract class FileStoreTableTestBase {
.hasSameElementsAs(Collections.singletonList("2|21|201"));
}
+ @Test
+ public void testBucketFilter() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(BUCKET, 5);
+ conf.set(BUCKET_KEY, "a");
+ });
+
+ TableWrite write = table.newWrite();
+ write.write(GenericRowData.of(1, 1, 2L));
+ write.write(GenericRowData.of(1, 3, 4L));
+ write.write(GenericRowData.of(1, 5, 6L));
+ write.write(GenericRowData.of(1, 7, 8L));
+ write.write(GenericRowData.of(1, 9, 10L));
+ table.newCommit("user").commit("0", write.prepareCommit());
+ write.close();
+
+ List<Split> splits =
+ table.newScan()
+ .withFilter(new PredicateBuilder(ROW_TYPE).equal(1, 5))
+ .plan()
+ .splits;
+ assertThat(splits.size()).isEqualTo(1);
+ assertThat(splits.get(0).bucket()).isEqualTo(1);
+ }
+
protected List<String> getResult(
TableRead read,
List<Split> splits,
@@ -141,5 +173,10 @@ public abstract class FileStoreTableTestBase {
return b;
}
- protected abstract FileStoreTable createFileStoreTable() throws Exception;
+ protected FileStoreTable createFileStoreTable() throws Exception {
+ return createFileStoreTable(conf -> {});
+ }
+
+ protected abstract FileStoreTable
createFileStoreTable(Consumer<Configuration> configure)
+ throws Exception;
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index 85c65757..aa96da9e 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -41,6 +41,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
+import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
@@ -87,7 +88,8 @@ public class WritePreemptMemoryTest extends
FileStoreTableTestBase {
}
@Override
- protected FileStoreTable createFileStoreTable() throws Exception {
+ protected FileStoreTable createFileStoreTable(Consumer<Configuration>
configure)
+ throws Exception {
Path tablePath = new Path(tempDir.toString());
Configuration conf = new Configuration();
conf.set(CoreOptions.PATH, tablePath.toString());
@@ -95,6 +97,7 @@ public class WritePreemptMemoryTest extends
FileStoreTableTestBase {
conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(30 * 1024));
conf.set(CoreOptions.PAGE_SIZE, new MemorySize(1024));
+ configure.accept(conf);
SchemaManager schemaManager = new SchemaManager(tablePath);
TableSchema schema =
schemaManager.commitNewVersion(
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
index eea054d4..827ef906 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
@@ -47,6 +47,9 @@ public class SinkRecordConverterTest {
assertThatThrownBy(() -> converter("a", "b"))
.hasMessageContaining("Primary keys [b] should contains all
bucket keys [a].");
+
+ assertThatThrownBy(() -> converter("a", "a", "a,b"))
+ .hasMessageContaining("Bucket keys [a] should not in partition
keys [a].");
}
@Test
@@ -69,6 +72,10 @@ public class SinkRecordConverterTest {
}
private SinkRecordConverter converter(String bk, String pk) {
+ return converter("", bk, pk);
+ }
+
+ private SinkRecordConverter converter(String partK, String bk, String pk) {
RowType rowType =
new RowType(
Arrays.asList(
@@ -83,7 +90,9 @@ public class SinkRecordConverterTest {
0,
fields,
TableSchema.currentHighestFieldId(fields),
- Collections.emptyList(),
+ "".equals(partK)
+ ? Collections.emptyList()
+ : Arrays.asList(partK.split(",")),
"".equals(pk) ? Collections.emptyList() :
Arrays.asList(pk.split(",")),
options,
"");