This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit a688465f3394c62928512e23a234e0bcc182e155 Author: Alibaba-HZY <[email protected]> AuthorDate: Wed Sep 10 17:41:53 2025 +0800 [flink] Support general predicates for partition pushdown in Flink Connector (#420) --- LICENSE | 1 + .../org/apache/fluss/predicate/LeafPredicate.java | 5 + .../org/apache/fluss/predicate/SimpleColStats.java | 87 +++ .../java/org/apache/fluss/utils/TypeUtils.java | 6 + .../fluss/predicate/SimpleColStatsTestUtils.java | 38 ++ fluss-flink/fluss-flink-common/pom.xml | 16 + .../apache/fluss/flink/row/FlinkAsFlussRow.java | 11 + .../org/apache/fluss/flink/source/FlinkSource.java | 20 +- .../fluss/flink/source/FlinkTableSource.java | 134 +++-- .../org/apache/fluss/flink/source/FlussSource.java | 4 +- .../source/enumerator/FlinkSourceEnumerator.java | 105 ++-- .../fluss/flink/utils/PredicateConverter.java | 328 ++++++++++++ .../fluss/flink/source/FlinkTableSourceITCase.java | 477 ++++++++++++++++- .../enumerator/FlinkSourceEnumeratorTest.java | 16 +- .../apache/fluss/flink/utils/FlinkTestBase.java | 4 +- .../fluss/flink/utils/PredicateConverterTest.java | 585 +++++++++++++++++++++ fluss-test-coverage/pom.xml | 1 - 17 files changed, 1728 insertions(+), 110 deletions(-) diff --git a/LICENSE b/LICENSE index 481cdf73d..393488043 100644 --- a/LICENSE +++ b/LICENSE @@ -361,6 +361,7 @@ Apache Kafka ./fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimingWheel.java Apache Paimon +./fluss-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java ./fluss-common/src/main/java/org/apache/fluss/predicate/And.java ./fluss-common/src/main/java/org/apache/fluss/predicate/CompareUtils.java ./fluss-common/src/main/java/org/apache/fluss/predicate/CompoundPredicate.java diff --git a/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java b/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java index b52a713ec..640c5093b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java +++ b/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java @@ -19,6 +19,7 @@ package org.apache.fluss.predicate; import org.apache.fluss.row.InternalRow; import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.DecimalType; import org.apache.fluss.types.LocalZonedTimestampType; import org.apache.fluss.types.TimestampType; @@ -84,6 +85,10 @@ public class LeafPredicate implements Predicate { return new LeafPredicate(function, type, fieldIndex, fieldName, literals); } + public LeafPredicate copyWithNewLiterals(List<Object> literals) { + return new LeafPredicate(function, DataTypes.STRING(), fieldIndex, fieldName, literals); + } + @Override public boolean test(InternalRow row) { return function.test(type, get(row, fieldIndex, type), literals); diff --git a/fluss-common/src/main/java/org/apache/fluss/predicate/SimpleColStats.java b/fluss-common/src/main/java/org/apache/fluss/predicate/SimpleColStats.java new file mode 100644 index 000000000..435d565dc --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/predicate/SimpleColStats.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.predicate; + +import javax.annotation.Nullable; + +import java.util.Objects; + +/** + * A simple column statistics, supports the following stats. + * + * <ul> + * <li>min: the minimum value of the column + * <li>max: the maximum value of the column + * <li>nullCount: the number of nulls + * </ul> + */ +public class SimpleColStats { + + public static final SimpleColStats NONE = new SimpleColStats(null, null, null); + + @Nullable private final Object min; + @Nullable private final Object max; + private final Long nullCount; + + public SimpleColStats(@Nullable Object min, @Nullable Object max, @Nullable Long nullCount) { + this.min = min; + this.max = max; + this.nullCount = nullCount; + } + + @Nullable + public Object min() { + return min; + } + + @Nullable + public Object max() { + return max; + } + + @Nullable + public Long nullCount() { + return nullCount; + } + + public boolean isNone() { + return min == null && max == null && nullCount == null; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof SimpleColStats)) { + return false; + } + SimpleColStats that = (SimpleColStats) o; + return Objects.equals(min, that.min) + && Objects.equals(max, that.max) + && Objects.equals(nullCount, that.nullCount); + } + + @Override + public int hashCode() { + return Objects.hash(min, max, nullCount); + } + + @Override + public String toString() { + return String.format("{%s, %s, %d}", min, max, nullCount); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/TypeUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/TypeUtils.java index 4c344aee1..04283c9d9 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/TypeUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/TypeUtils.java @@ -21,10 +21,12 @@ import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; import org.apache.fluss.types.DataType; import org.apache.fluss.types.DecimalType; +import org.apache.fluss.types.LocalZonedTimestampType; import org.apache.fluss.types.TimestampType; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; +import java.util.TimeZone; /** Type related helper functions. */ public class TypeUtils { @@ -62,6 +64,10 @@ public class TypeUtils { case TIMESTAMP_WITHOUT_TIME_ZONE: TimestampType timestampType = (TimestampType) type; return BinaryStringUtils.toTimestampNtz(str, timestampType.getPrecision()); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type; + return BinaryStringUtils.toTimestampLtz( + str, localZonedTimestampType.getPrecision(), TimeZone.getDefault()); default: throw new UnsupportedOperationException("Unsupported type " + type); } diff --git a/fluss-common/src/test/java/org/apache/fluss/predicate/SimpleColStatsTestUtils.java b/fluss-common/src/test/java/org/apache/fluss/predicate/SimpleColStatsTestUtils.java new file mode 100644 index 000000000..a0e3ef440 --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/predicate/SimpleColStatsTestUtils.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.predicate; + +import org.apache.fluss.row.GenericRow; + +/** Utils for testing with {@link SimpleColStats}. */ +public class SimpleColStatsTestUtils { + + public static boolean test(Predicate predicate, long rowCount, SimpleColStats[] fieldStats) { + Object[] min = new Object[fieldStats.length]; + Object[] max = new Object[fieldStats.length]; + Long[] nullCounts = new Long[fieldStats.length]; + for (int i = 0; i < fieldStats.length; i++) { + min[i] = fieldStats[i].min(); + max[i] = fieldStats[i].max(); + nullCounts[i] = fieldStats[i].nullCount(); + } + + return predicate.test(rowCount, GenericRow.of(min), GenericRow.of(max), nullCounts); + } +} diff --git a/fluss-flink/fluss-flink-common/pom.xml b/fluss-flink/fluss-flink-common/pom.xml index c7d59a7f9..aa22e2373 100644 --- a/fluss-flink/fluss-flink-common/pom.xml +++ b/fluss-flink/fluss-flink-common/pom.xml @@ -34,6 +34,7 @@ <properties> <flink.major.version>1.20</flink.major.version> <flink.minor.version>1.20.1</flink.minor.version> + <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> @@ -126,6 +127,21 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <version>${flink.minor.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <version>${flink.minor.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.fluss</groupId> <artifactId>fluss-server</artifactId> diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java index 4b48a4ef4..3593d1f10 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java @@ -17,6 +17,7 @@ package org.apache.fluss.flink.row; +import org.apache.fluss.flink.utils.FlinkConversions; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; import org.apache.fluss.row.InternalRow; @@ -24,8 +25,10 @@ import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; /** Wraps a Flink {@link RowData} as a Fluss {@link InternalRow}. */ public class FlinkAsFlussRow implements InternalRow { @@ -132,4 +135,12 @@ public class FlinkAsFlussRow implements InternalRow { public byte[] getBytes(int pos) { return flinkRow.getBinary(pos); } + + public static Object fromFlinkObject(Object o, DataType type) { + if (o == null) { + return null; + } + return InternalRow.createFieldGetter(FlinkConversions.toFlussType(type), 0) + .getFieldOrNull((new FlinkAsFlussRow()).replace(GenericRowData.of(o))); + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java index 9687b5efc..bc15390fe 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java @@ -30,10 +30,10 @@ import org.apache.fluss.flink.source.split.SourceSplitBase; import org.apache.fluss.flink.source.split.SourceSplitSerializer; import org.apache.fluss.flink.source.state.FlussSourceEnumeratorStateSerializer; import org.apache.fluss.flink.source.state.SourceEnumeratorState; -import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual; import org.apache.fluss.lake.source.LakeSource; import org.apache.fluss.lake.source.LakeSplit; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Predicate; import org.apache.fluss.types.RowType; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -50,10 +50,6 @@ import org.apache.flink.core.io.SimpleVersionedSerializer; import javax.annotation.Nullable; -import java.util.List; - -import static org.apache.fluss.utils.Preconditions.checkNotNull; - /** Flink source for Fluss. */ public class FlinkSource<OUT> implements Source<OUT, SourceSplitBase, SourceEnumeratorState>, ResultTypeQueryable { @@ -69,10 +65,8 @@ public class FlinkSource<OUT> protected final long scanPartitionDiscoveryIntervalMs; private final boolean streaming; private final FlussDeserializationSchema<OUT> deserializationSchema; - - private final List<FieldEqual> partitionFilters; - - private final @Nullable LakeSource<LakeSplit> lakeSource; + @Nullable private final Predicate partitionFilters; + @Nullable private final LakeSource<LakeSplit> lakeSource; public FlinkSource( Configuration flussConf, @@ -85,7 +79,7 @@ public class FlinkSource<OUT> long scanPartitionDiscoveryIntervalMs, FlussDeserializationSchema<OUT> deserializationSchema, boolean streaming, - List<FieldEqual> partitionFilters) { + @Nullable Predicate partitionFilters) { this( flussConf, tablePath, @@ -112,8 +106,8 @@ public class FlinkSource<OUT> long scanPartitionDiscoveryIntervalMs, FlussDeserializationSchema<OUT> deserializationSchema, boolean streaming, - List<FieldEqual> partitionFilters, - LakeSource<LakeSplit> lakeSource) { + @Nullable Predicate partitionFilters, + @Nullable LakeSource<LakeSplit> lakeSource) { this.flussConf = flussConf; this.tablePath = tablePath; this.hasPrimaryKey = hasPrimaryKey; @@ -124,7 +118,7 @@ public class FlinkSource<OUT> this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; this.deserializationSchema = deserializationSchema; this.streaming = streaming; - this.partitionFilters = checkNotNull(partitionFilters); + this.partitionFilters = partitionFilters; this.lakeSource = lakeSource; } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 0f6c4c7e5..3c4464724 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -26,6 +26,7 @@ import org.apache.fluss.flink.source.lookup.FlinkLookupFunction; import org.apache.fluss.flink.source.lookup.LookupNormalizer; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; import org.apache.fluss.flink.utils.FlinkConversions; +import org.apache.fluss.flink.utils.PredicateConverter; import org.apache.fluss.flink.utils.PushdownUtils; import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual; import org.apache.fluss.lake.source.LakeSource; @@ -34,11 +35,15 @@ import org.apache.fluss.metadata.MergeEngineType; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.predicate.GreaterOrEqual; import org.apache.fluss.predicate.LeafPredicate; +import org.apache.fluss.predicate.PartitionPredicateVisitor; import org.apache.fluss.predicate.Predicate; import org.apache.fluss.predicate.PredicateBuilder; +import org.apache.fluss.predicate.PredicateVisitor; +import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.RowType; +import org.apache.fluss.utils.PartitionUtils; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -85,10 +90,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; import static org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource; import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE; -import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLUSS_INTERNAL_VALUE; import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals; import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; import static org.apache.fluss.utils.Preconditions.checkNotNull; @@ -140,11 +146,11 @@ public class FlinkTableSource @Nullable private RowLevelModificationType modificationScanType; // count(*) push down - protected boolean selectRowCount = false; + private boolean selectRowCount = false; private long limit = -1; - private List<FieldEqual> partitionFilters = Collections.emptyList(); + @Nullable private Predicate partitionFilters; private final Map<String, String> tableOptions; @@ -478,6 +484,7 @@ public class FlinkTableSource && startupOptions.startupMode == FlinkConnectorOptions.ScanStartupMode.FULL && hasPrimaryKey() && filters.size() == primaryKeyIndexes.length) { + Map<Integer, LogicalType> primaryKeyTypes = getPrimaryKeyTypes(); List<FieldEqual> fieldEquals = extractFieldEquals( @@ -499,52 +506,72 @@ public class FlinkTableSource } singleRowFilter = lookupRow; return Result.of(acceptedFilters, remainingFilters); - } else if (isPartitioned()) { - // dynamic partition pushdown - List<FieldEqual> fieldEquals = - extractFieldEquals( - filters, - getPartitionKeyTypes(), - acceptedFilters, - remainingFilters, - FLUSS_INTERNAL_VALUE); - - // partitions are filtered by string representations, convert the equals to string first - partitionFilters = stringifyFieldEquals(fieldEquals); - + } else if (isPartitioned() + && !RowLevelModificationType.UPDATE.equals(modificationScanType)) { + // apply partition filter pushdown + List<Predicate> converted = new ArrayList<>(); + + List<String> fieldNames = tableOutputType.getFieldNames(); + List<String> partitionKeys = + Arrays.stream(partitionKeyIndexes) + .mapToObj(fieldNames::get) + .collect(Collectors.toList()); + + PredicateVisitor<Boolean> partitionPredicateVisitor = + new PartitionPredicateVisitor(partitionKeys); + + LogicalType[] partitionKeyTypes = + Arrays.stream(partitionKeyIndexes) + .mapToObj(producedDataType.getChildren()::get) + .toArray(LogicalType[]::new); + for (ResolvedExpression filter : filters) { + + Optional<Predicate> predicateOptional = + PredicateConverter.convert( + org.apache.flink.table.types.logical.RowType.of( + partitionKeyTypes, partitionKeys.toArray(new String[0])), + filter); + + if (predicateOptional.isPresent()) { + Predicate p = predicateOptional.get(); + if (!p.visit(partitionPredicateVisitor)) { + remainingFilters.add(filter); + } else { + acceptedFilters.add(filter); + } + // Convert literals in the predicate to string using + // PartitionUtils.convertValueOfType + p = stringifyPredicate(p); + converted.add(p); + } else { + remainingFilters.add(filter); + } + } + partitionFilters = converted.isEmpty() ? null : PredicateBuilder.and(converted); // lake source is not null if (lakeSource != null) { - // and exist field equals, push down to lake source - if (!fieldEquals.isEmpty()) { - // convert flink row type to fluss row type - RowType flussRowType = FlinkConversions.toFlussRowType(tableOutputType); - - List<Predicate> lakePredicates = new ArrayList<>(); - PredicateBuilder predicateBuilder = new PredicateBuilder(flussRowType); - - for (FieldEqual fieldEqual : fieldEquals) { - lakePredicates.add( - predicateBuilder.equal( - fieldEqual.fieldIndex, fieldEqual.equalValue)); - } + List<Predicate> lakePredicates = new ArrayList<>(); + for (ResolvedExpression filter : filters) { + Optional<Predicate> predicateOptional = + PredicateConverter.convert(tableOutputType, filter); + predicateOptional.ifPresent(lakePredicates::add); + } - if (!lakePredicates.isEmpty()) { - final LakeSource.FilterPushDownResult filterPushDownResult = - lakeSource.withFilters(lakePredicates); - if (filterPushDownResult.acceptedPredicates().size() - != lakePredicates.size()) { - LOG.info( - "LakeSource rejected some partition filters. Falling back to Flink-side filtering."); - // Flink will apply all filters to preserve correctness - return Result.of(Collections.emptyList(), filters); - } + if (!lakePredicates.isEmpty()) { + final LakeSource.FilterPushDownResult filterPushDownResult = + lakeSource.withFilters(lakePredicates); + if (filterPushDownResult.acceptedPredicates().size() != lakePredicates.size()) { + LOG.info( + "LakeSource rejected some partition filters. Falling back to Flink-side filtering."); + // Flink will apply all filters to preserve correctness + return Result.of(Collections.emptyList(), filters); } } } return Result.of(acceptedFilters, remainingFilters); - } else { - return Result.of(Collections.emptyList(), filters); } + + return Result.of(Collections.emptyList(), filters); } @Override @@ -651,4 +678,29 @@ public class FlinkTableSource public int[] getPartitionKeyIndexes() { return partitionKeyIndexes; } + + /** + * Converts literals in LeafPredicate to string representation using + * PartitionUtils.convertValueOfType. This is necessary because partition metadata is stored as + * string. + */ + private Predicate stringifyPredicate(Predicate predicate) { + if (predicate instanceof LeafPredicate) { + // Convert literals to string using PartitionUtils.convertValueOfType + List<Object> convertedLiterals = new ArrayList<>(); + for (Object literal : ((LeafPredicate) predicate).literals()) { + if (literal != null) { + String stringValue = + PartitionUtils.convertValueOfType( + literal, ((LeafPredicate) predicate).type().getTypeRoot()); + convertedLiterals.add(BinaryString.fromString(stringValue)); + } else { + convertedLiterals.add(null); + } + } + return ((LeafPredicate) predicate).copyWithNewLiterals(convertedLiterals); + } + + return predicate; + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java index 428204d83..427741834 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java @@ -26,8 +26,6 @@ import org.apache.fluss.types.RowType; import javax.annotation.Nullable; -import java.util.Collections; - /** * A Flink DataStream source implementation for reading data from Fluss tables. * @@ -82,7 +80,7 @@ public class FlussSource<OUT> extends FlinkSource<OUT> { scanPartitionDiscoveryIntervalMs, deserializationSchema, streaming, - Collections.emptyList()); + null); } /** diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 867ff442f..de3731224 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -35,14 +35,16 @@ import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit; import org.apache.fluss.flink.source.split.LogSplit; import org.apache.fluss.flink.source.split.SourceSplitBase; import org.apache.fluss.flink.source.state.SourceEnumeratorState; -import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual; import org.apache.fluss.lake.source.LakeSource; import org.apache.fluss.lake.source.LakeSplit; import org.apache.fluss.metadata.PartitionInfo; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; -import org.apache.fluss.types.DataField; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; import org.apache.fluss.utils.ExceptionUtils; import org.apache.flink.annotation.VisibleForTesting; @@ -133,7 +135,7 @@ public class FlinkSourceEnumerator private volatile boolean closed = false; - private final List<FieldEqual> partitionFilters; + @Nullable private final Predicate partitionFilters; @Nullable private final LakeSource<LakeSplit> lakeSource; @@ -146,7 +148,7 @@ public class FlinkSourceEnumerator OffsetsInitializer startingOffsetsInitializer, long scanPartitionDiscoveryIntervalMs, boolean streaming, - List<FieldEqual> partitionFilters) { + @Nullable Predicate partitionFilters) { this( tablePath, flussConf, @@ -169,7 +171,7 @@ public class FlinkSourceEnumerator OffsetsInitializer startingOffsetsInitializer, long scanPartitionDiscoveryIntervalMs, boolean streaming, - List<FieldEqual> partitionFilters, + @Nullable Predicate partitionFilters, @Nullable LakeSource<LakeSplit> lakeSource) { this( tablePath, @@ -199,7 +201,7 @@ public class FlinkSourceEnumerator OffsetsInitializer startingOffsetsInitializer, long scanPartitionDiscoveryIntervalMs, boolean streaming, - List<FieldEqual> partitionFilters, + @Nullable Predicate partitionFilters, @Nullable LakeSource<LakeSplit> lakeSource) { this.tablePath = checkNotNull(tablePath); this.flussConf = checkNotNull(flussConf); @@ -216,7 +218,7 @@ public class FlinkSourceEnumerator : new LinkedList<>(pendingHybridLakeFlussSplits); this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; this.streaming = streaming; - this.partitionFilters = checkNotNull(partitionFilters); + this.partitionFilters = partitionFilters; this.stoppingOffsetsInitializer = streaming ? new NoStoppingOffsetsInitializer() : OffsetsInitializer.latest(); this.lakeSource = lakeSource; @@ -354,30 +356,43 @@ public class FlinkSourceEnumerator /** Apply partition filter. */ private List<PartitionInfo> applyPartitionFilter(List<PartitionInfo> partitionInfos) { - if (!partitionFilters.isEmpty()) { - return partitionInfos.stream() - .filter( - partitionInfo -> { - Map<String, String> specMap = - partitionInfo.getPartitionSpec().getSpecMap(); - // use getFields() instead of getFieldNames() to - // avoid collection construction - List<DataField> fields = tableInfo.getRowType().getFields(); - for (FieldEqual filter : partitionFilters) { - String fieldName = fields.get(filter.fieldIndex).getName(); - String partitionValue = specMap.get(fieldName); - if (partitionValue == null - || !filter.equalValue - .toString() - .equals(partitionValue)) { - return false; - } - } - return true; - }) - .collect(Collectors.toList()); - } - return partitionInfos; + if (partitionFilters == null) { + return partitionInfos; + } else { + int originalSize = partitionInfos.size(); + List<PartitionInfo> filteredPartitionInfos = + partitionInfos.stream() + .filter(partition -> partitionFilters.test(toInternalRow(partition))) + .collect(Collectors.toList()); + + int filteredSize = filteredPartitionInfos.size(); + if (originalSize != filteredSize) { + LOG.debug( + "Applied partition filter for table {}: {} partitions filtered down to {} " + + "matching partitions with predicate: {}. Matching partitions after filtering: {}", + tablePath, + originalSize, + filteredSize, + partitionFilters, + filteredPartitionInfos); + } else { + LOG.debug( + "Partition filter applied for table {}, but all {} partitions matched the predicate", + tablePath, + originalSize); + } + return filteredPartitionInfos; + } + } + + private static InternalRow toInternalRow(PartitionInfo partitionInfo) { + List<String> partitionValues = + partitionInfo.getResolvedPartitionSpec().getPartitionValues(); + GenericRow genericRow = new GenericRow(partitionValues.size()); + for (int i = 0; i < partitionValues.size(); i++) { + genericRow.setField(i, BinaryString.fromString(partitionValues.get(i))); + } + return genericRow; } /** Init the splits for Fluss. */ @@ -390,17 +405,39 @@ public class FlinkSourceEnumerator LOG.error("Failed to list partitions for {}", tablePath, t); return; } + + LOG.debug( + "Checking partition changes for table {}, found {} partitions", + tablePath, + partitionInfos.size()); + final PartitionChange partitionChange = getPartitionChange(partitionInfos); if (partitionChange.isEmpty()) { + LOG.debug("No partition changes detected for table {}", tablePath); return; } // handle removed partitions - handlePartitionsRemoved(partitionChange.removedPartitions); + if (!partitionChange.removedPartitions.isEmpty()) { + LOG.info( + "Handling {} removed partitions for table {}: {}", + partitionChange.removedPartitions.size(), + tablePath, + partitionChange.removedPartitions); + handlePartitionsRemoved(partitionChange.removedPartitions); + } // handle new partitions - context.callAsync( - () -> initPartitionedSplits(partitionChange.newPartitions), this::handleSplitsAdd); + if (!partitionChange.newPartitions.isEmpty()) { + LOG.info( + "Handling {} new partitions for table {}: {}", + partitionChange.newPartitions.size(), + tablePath, + partitionChange.newPartitions); + context.callAsync( + () -> initPartitionedSplits(partitionChange.newPartitions), + this::handleSplitsAdd); + } } private PartitionChange getPartitionChange(Set<PartitionInfo> fetchedPartitionInfos) { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java new file mode 100644 index 000000000..1a7939a14 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.utils; + +import org.apache.fluss.flink.row.FlinkAsFlussRow; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.predicate.PredicateBuilder; +import org.apache.fluss.predicate.UnsupportedExpression; +import org.apache.fluss.utils.TypeUtils; + +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionVisitor; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.TypeLiteralExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.RowType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.regex.Pattern; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * Convert Flink {@link Expression} to Fluss {@link Predicate}. + * + * <p>For {@link FieldReferenceExpression}, please use name instead of index, if the project + * pushdown is before and the filter pushdown is after, the index of the filter will be projected. + */ +public class PredicateConverter implements ExpressionVisitor<Predicate> { + + private final PredicateBuilder builder; + + public PredicateConverter(RowType type) { + this(new PredicateBuilder(FlinkConversions.toFlussRowType(type))); + } + + public PredicateConverter(PredicateBuilder builder) { + this.builder = builder; + } + + /** Accepts simple LIKE patterns like "abc%". */ + private static final Pattern BEGIN_PATTERN = Pattern.compile("^[^%_]+%$"); + + private static final Pattern END_PATTERN = Pattern.compile("^%[^%_]+$"); + private static final Pattern CONTAINS_PATTERN = Pattern.compile("^%[^%_]+%$"); + + @Override + public Predicate visit(CallExpression call) { + FunctionDefinition func = call.getFunctionDefinition(); + List<Expression> children = call.getChildren(); + + if (func == BuiltInFunctionDefinitions.AND) { + return PredicateBuilder.and(children.get(0).accept(this), children.get(1).accept(this)); + } else if (func == BuiltInFunctionDefinitions.OR) { + return PredicateBuilder.or(children.get(0).accept(this), children.get(1).accept(this)); + } else if (func == BuiltInFunctionDefinitions.NOT) { + return visitNotFunction(children, builder::equal, builder::equal); + } else if (func == BuiltInFunctionDefinitions.EQUALS) { + return visitBiFunction(children, builder::equal, builder::equal); + } else if (func == BuiltInFunctionDefinitions.NOT_EQUALS) { + return visitBiFunction(children, builder::notEqual, builder::notEqual); + } else if (func == BuiltInFunctionDefinitions.GREATER_THAN) { + return visitBiFunction(children, builder::greaterThan, builder::lessThan); + } else if (func == BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL) { + return visitBiFunction(children, builder::greaterOrEqual, builder::lessOrEqual); + } else if (func == BuiltInFunctionDefinitions.LESS_THAN) { + return visitBiFunction(children, builder::lessThan, builder::greaterThan); + } else if (func == BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL) { + return visitBiFunction(children, builder::lessOrEqual, builder::greaterOrEqual); + } else if (func == BuiltInFunctionDefinitions.IN) { + FieldReferenceExpression fieldRefExpr = + extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new); + List<Object> literals = new ArrayList<>(); + for (int i = 1; i < children.size(); i++) { + literals.add(extractLiteral(fieldRefExpr.getOutputDataType(), children.get(i))); + } + return builder.in(builder.indexOf(fieldRefExpr.getName()), literals); + } else if (func == BuiltInFunctionDefinitions.IS_NULL) { + return extractFieldReference(children.get(0)) + .map(FieldReferenceExpression::getName) + .map(builder::indexOf) + .map(builder::isNull) + .orElseThrow(UnsupportedExpression::new); + } else if (func == BuiltInFunctionDefinitions.IS_NOT_NULL) { + return extractFieldReference(children.get(0)) + .map(FieldReferenceExpression::getName) + .map(builder::indexOf) + .map(builder::isNotNull) + .orElseThrow(UnsupportedExpression::new); + } else if (func == BuiltInFunctionDefinitions.BETWEEN) { + FieldReferenceExpression fieldRefExpr = + extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new); + return builder.between( + builder.indexOf(fieldRefExpr.getName()), children.get(1), children.get(2)); + } else if (func == BuiltInFunctionDefinitions.LIKE) { + FieldReferenceExpression fieldRefExpr = + extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new); + if (fieldRefExpr + .getOutputDataType() + .getLogicalType() + .getTypeRoot() + .getFamilies() + .contains(LogicalTypeFamily.CHARACTER_STRING) + && builder.indexOf(fieldRefExpr.getName()) != -1) { + String sqlPattern = + Objects.requireNonNull( + extractLiteral( + fieldRefExpr.getOutputDataType(), children.get(1))) + .toString(); + String escape = + children.size() <= 2 + ? null + : Objects.requireNonNull( + extractLiteral( + fieldRefExpr.getOutputDataType(), + children.get(2))) + .toString(); + + if (escape == null) { + if (BEGIN_PATTERN.matcher(sqlPattern).matches()) { + String prefix = sqlPattern.substring(0, sqlPattern.length() - 1); + return builder.startsWith(builder.indexOf(fieldRefExpr.getName()), prefix); + } + if (END_PATTERN.matcher(sqlPattern).matches()) { + String suffix = sqlPattern.substring(1); + return builder.endsWith(builder.indexOf(fieldRefExpr.getName()), suffix); + } + if (CONTAINS_PATTERN.matcher(sqlPattern).matches() + && sqlPattern.indexOf('%', 1) == sqlPattern.length() - 1) { + String mid = sqlPattern.substring(1, sqlPattern.length() - 1); + return builder.contains(builder.indexOf(fieldRefExpr.getName()), mid); + } + } + } + } + + // TODO is_xxx, between_xxx, similar, in, not_in, not? + + throw new UnsupportedExpression(); + } + + private Predicate visitBiFunction( + List<Expression> children, + BiFunction<Integer, Object, Predicate> visit1, + BiFunction<Integer, Object, Predicate> visit2) { + Optional<FieldReferenceExpression> fieldRefExpr = extractFieldReference(children.get(0)); + if (fieldRefExpr.isPresent() && builder.indexOf(fieldRefExpr.get().getName()) != -1) { + Object literal = + extractLiteral(fieldRefExpr.get().getOutputDataType(), children.get(1)); + return visit1.apply(builder.indexOf(fieldRefExpr.get().getName()), literal); + } else { + fieldRefExpr = extractFieldReference(children.get(1)); + if (fieldRefExpr.isPresent()) { + Object literal = + extractLiteral(fieldRefExpr.get().getOutputDataType(), children.get(0)); + return visit2.apply(builder.indexOf(fieldRefExpr.get().getName()), literal); + } + } + + throw new UnsupportedExpression(); + } + + private Predicate visitNotFunction( + List<Expression> children, + BiFunction<Integer, Object, Predicate> visit1, + BiFunction<Integer, Object, Predicate> visit2) { + Optional<FieldReferenceExpression> fieldRefExpr = extractFieldReference(children.get(0)); + if (fieldRefExpr.isPresent() && builder.indexOf(fieldRefExpr.get().getName()) != -1) { + + return visit1.apply(builder.indexOf(fieldRefExpr.get().getName()), false); + } else { + fieldRefExpr = extractFieldReference(children.get(1)); + if (fieldRefExpr.isPresent()) { + return visit2.apply(builder.indexOf(fieldRefExpr.get().getName()), false); + } + } + + throw new UnsupportedExpression(); + } + + private Optional<FieldReferenceExpression> extractFieldReference(Expression expression) { + if (expression instanceof FieldReferenceExpression) { + return Optional.of((FieldReferenceExpression) expression); + } + return Optional.empty(); + } + + private Object extractLiteral(DataType expectedType, Expression expression) { + LogicalType expectedLogicalType = expectedType.getLogicalType(); + if (!supportsPredicate(expectedLogicalType)) { + throw new UnsupportedExpression(); + } + + if (expression instanceof ValueLiteralExpression) { + ValueLiteralExpression valueExpression = (ValueLiteralExpression) expression; + if (valueExpression.isNull()) { + return null; + } + + DataType actualType = valueExpression.getOutputDataType(); + LogicalType actualLogicalType = actualType.getLogicalType(); + Optional<?> valueOpt = valueExpression.getValueAs(actualType.getConversionClass()); + if (valueOpt.isPresent()) { + Object value = valueOpt.get(); + if (actualLogicalType.getTypeRoot().equals(expectedLogicalType.getTypeRoot())) { + return FlinkAsFlussRow.fromFlinkObject( + DataStructureConverters.getConverter(expectedType) + .toInternalOrNull(value), + expectedType); + } else if (supportsImplicitCast(actualLogicalType, expectedLogicalType)) { + try { + return TypeUtils.castFromString( + value.toString(), FlinkConversions.toFlussType(expectedType)); + } catch (Exception ignored) { + } + } + } + } + + throw new UnsupportedExpression(); + } + + private boolean isStringType(LogicalType type) { + switch (type.getTypeRoot()) { + case CHAR: + case VARCHAR: + return true; + default: + return false; + } + } + + private boolean supportsPredicate(LogicalType type) { + switch (type.getTypeRoot()) { + case CHAR: + case VARCHAR: + case BOOLEAN: + case BINARY: + case VARBINARY: + case DECIMAL: + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case FLOAT: + case DOUBLE: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + return true; + default: + return false; + } + } + + @Override + public Predicate visit(ValueLiteralExpression valueLiteralExpression) { + throw new UnsupportedExpression(); + } + + @Override + public Predicate visit(FieldReferenceExpression fieldReferenceExpression) { + if (fieldReferenceExpression.getOutputDataType().getLogicalType() instanceof BooleanType) { + return builder.equal(builder.indexOf(fieldReferenceExpression.getName()), true); + } + throw new UnsupportedExpression(); + } + + @Override + public Predicate visit(TypeLiteralExpression typeLiteralExpression) { + throw new UnsupportedExpression(); + } + + @Override + public Predicate visit(Expression expression) { + throw new UnsupportedExpression(); + } + + /** + * Try best to convert a {@link ResolvedExpression} to {@link Predicate}. + * + * @param filter a resolved expression + * @return {@link Predicate} if no {@link UnsupportedExpression} thrown. + */ + public static Optional<Predicate> convert(RowType rowType, ResolvedExpression filter) { + try { + return Optional.ofNullable(filter.accept(new PredicateConverter(rowType))); + } catch (UnsupportedExpression e) { + return Optional.empty(); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java index c279c15b3..03bdc836c 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java @@ -25,9 +25,12 @@ import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.writer.UpsertWriter; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.PartitionInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.utils.clock.ManualClock; @@ -604,7 +607,8 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { } List<String> expectedRowValues = - writeRowsToPartition(conn, tablePath, partitionNameById.values()); + writeRowsToPartition( + conn, tablePath, Collections.singleton(partitionNameById.values())); waitUntilAllBucketFinishSnapshot(admin, tablePath, partitionNameById.values()); org.apache.flink.util.CloseableIterator<Row> rowIter = @@ -961,6 +965,262 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { assertResultsIgnoreOrder(rowIter, expectedRowValues, true); } + @Test + void testStreamingReadDatePartitionTypePushDown() throws Exception { + tEnv.executeSql( + "create table partitioned_table_long" + + " (a int not null, b varchar, c date, primary key (a, c) NOT ENFORCED) partitioned by (c) "); + TablePath tablePath = TablePath.of(DEFAULT_DB, "partitioned_table_long"); + tEnv.executeSql("alter table partitioned_table_long add partition (c=4)"); + tEnv.executeSql("alter table partitioned_table_long add partition (c=5)"); + + List<String> expectedRowValues = + writeRowsToPartition(conn, tablePath, Arrays.asList(4, 5)).stream() + .filter(s -> s.endsWith("4]")) + .map(s -> s.replace("4]", "1970-01-05]")) + .collect(Collectors.toList()); + waitUntilAllBucketFinishSnapshot( + admin, tablePath, Arrays.asList("1970-01-05", "1970-01-06")); + + String plan = + tEnv.explainSql("select * from partitioned_table_long where c =DATE'1970-01-05'"); + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_long, filter=[=(c, 1970-01-05)], project=[a, b]]], fields=[a, b])"); + + org.apache.flink.util.CloseableIterator<Row> rowIter = + tEnv.executeSql("select * from partitioned_table_long where c =DATE'1970-01-05'") + .collect(); + + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + } + + @Test + void testStreamingReadTimestampPartitionTypePushDown() throws Exception { + // Add partitions with timestamp values (using epoch milliseconds) + long timestamp1Millis = 1609459200000L; // 2021-01-01 00:00:00 UTC + long timestamp2Millis = 1609545600000L; // 2021-01-02 00:00:00 UTC + TimestampLtz timestamp1 = TimestampLtz.fromEpochMillis(timestamp1Millis); + TimestampLtz timestamp2 = TimestampLtz.fromEpochMillis(timestamp2Millis); + + // Test TIMESTAMP partition type + tEnv.executeSql( + "create table partitioned_table_timestamp" + + " (a int not null, b varchar, c timestamp(3), primary key (a, c) NOT ENFORCED) partitioned by (c) "); + TablePath tablePath2 = TablePath.of(DEFAULT_DB, "partitioned_table_timestamp"); + + // Add partitions with timestamp values + TimestampNtz timestampNtz1 = TimestampNtz.fromMillis(timestamp1Millis); + TimestampNtz timestampNtz2 = TimestampNtz.fromMillis(timestamp2Millis); + + tEnv.executeSql( + String.format( + "alter table partitioned_table_timestamp add partition (c=%d)", + timestamp1Millis)); + tEnv.executeSql( + String.format( + "alter table partitioned_table_timestamp add partition (c=%d)", + timestamp2Millis)); + + // Write test data manually for timestamp partitions + List<InternalRow> rows2 = new ArrayList<>(); + List<String> expectedRowValues2 = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + rows2.add(row(i, "v1", timestampNtz1)); + expectedRowValues2.add(String.format("+I[%d, v1, 2021-01-01T00:00]", i)); + } + for (int i = 0; i < 10; i++) { + rows2.add(row(i, "v1", timestampNtz2)); + } + writeRows(conn, tablePath2, rows2, false); + waitUntilAllBucketFinishSnapshot( + admin, tablePath2, Arrays.asList("2021-01-01-00-00-00_", "2021-01-02-00-00-00_")); + + // Test query with TIMESTAMP literal + String plan2 = + tEnv.explainSql( + "select * from partitioned_table_timestamp where c = TIMESTAMP '2021-01-01 00:00:00'"); + assertThat(plan2) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_timestamp, filter=[=(c, 2021-01-01 00:00:00:TIMESTAMP(3))], project=[a, b]]], fields=[a, b])"); + + org.apache.flink.util.CloseableIterator<Row> rowIter2 = + tEnv.executeSql( + "select * from partitioned_table_timestamp where c = TIMESTAMP '2021-01-01 00:00:00'") + .collect(); + assertResultsIgnoreOrder(rowIter2, expectedRowValues2, true); + + // Test TIMESTAMP_LTZ partition type + tEnv.executeSql( + "create table partitioned_table_timestamp_ltz" + + " (a int not null, b varchar, c timestamp_ltz(3), primary key (a, c) NOT ENFORCED) partitioned by (c) "); + TablePath tablePath1 = TablePath.of(DEFAULT_DB, "partitioned_table_timestamp_ltz"); + + tEnv.executeSql( + String.format( + "alter table partitioned_table_timestamp_ltz add partition (c=%d)", + timestamp1Millis)); + tEnv.executeSql( + String.format( + "alter table partitioned_table_timestamp_ltz add partition (c=%d)", + timestamp2Millis)); + + // Write test data manually for timestamp_ltz partitions + List<InternalRow> rows1 = new ArrayList<>(); + List<String> expectedRowValues1 = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + rows1.add(row(i, "v1", timestamp1)); + expectedRowValues1.add(String.format("+I[%d, v1, 2021-01-01T00:00:00Z]", i)); + } + for (int i = 0; i < 10; i++) { + rows1.add(row(i, "v1", timestamp2)); + } + writeRows(conn, tablePath1, rows1, false); + waitUntilAllBucketFinishSnapshot( + admin, tablePath1, Arrays.asList("2021-01-01-00-00-00_", "2021-01-02-00-00-00_")); + + // TODO add test for timestamp_ltz + // Test query with TIMESTAMP_LTZ literal + String plan1 = + tEnv.explainSql( + "select * from partitioned_table_timestamp_ltz where c = cast(TIMESTAMP'2021-01-01 08:00:00' as timestamp_ltz(3)) "); + CloseableIterator<Row> collect = + tEnv.executeSql("select cast(TIMESTAMP'2021-01-01 08:00:00' as timestamp_ltz(3))") + .collect(); + org.apache.flink.util.CloseableIterator<Row> rowIter1 = + tEnv.executeSql( + "select * from partitioned_table_timestamp_ltz where c = cast(TIMESTAMP'2021-01-01 08:00:00' as timestamp_ltz(3))") + .collect(); + assertResultsIgnoreOrder(rowIter1, expectedRowValues1, true); + } + + @Test + void testStreamingReadMultiTypePartitionPushDown() throws Exception { + // Create a table with multiple partition fields of different types + tEnv.executeSql( + "create table multi_type_partitioned_table" + + " (id int not null, name varchar, year_val int, month_val varchar, day_val bigint, is_active boolean, " + + "primary key (id, year_val, month_val, day_val, is_active) NOT ENFORCED) " + + "partitioned by (year_val, month_val, day_val, is_active) "); + TablePath tablePath = TablePath.of(DEFAULT_DB, "multi_type_partitioned_table"); + + // Add partitions with different types + tEnv.executeSql( + "alter table multi_type_partitioned_table add partition (year_val=2025, month_val='01', day_val=15, is_active='true')"); + tEnv.executeSql( + "alter table multi_type_partitioned_table add partition (year_val=2025, month_val='02', day_val=20, is_active='false')"); + tEnv.executeSql( + "alter table multi_type_partitioned_table add partition (year_val=2026, month_val='01', day_val=10, is_active='true')"); + + // Write test data to partitions + List<InternalRow> rows = new ArrayList<>(); + List<String> expectedRowValues = new ArrayList<>(); + + // Data for partition (2025, '01', 15, true) + for (int i = 1; i <= 3; i++) { + rows.add(row(i, "name" + i, 2025, "01", 15L, true)); + expectedRowValues.add(String.format("+I[%d, name%d, 2025, 01, 15, true]", i, i)); + } + + // Data for partition (2025, '02', 20, false) + for (int i = 4; i <= 6; i++) { + rows.add(row(i, "name" + i, 2025, "02", 20L, false)); + } + + // Data for partition (2026, '01', 10, true) + for (int i = 7; i <= 9; i++) { + rows.add(row(i, "name" + i, 2026, "01", 10L, true)); + } + + writeRows(conn, tablePath, rows, false); + waitUntilAllBucketFinishSnapshot( + admin, + tablePath, + Arrays.asList("2025$01$15$true", "2025$02$20$false", "2026$01$10$true")); + + // Test 1: Filter by integer partition field + String plan1 = + tEnv.explainSql("select * from multi_type_partitioned_table where year_val = 2025"); + assertThat(plan1) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, multi_type_partitioned_table, filter=[=(year_val, 2025)], project=[id, name, month_val, day_val, is_active]]], fields=[id, name, month_val, day_val, is_active])"); + + org.apache.flink.util.CloseableIterator<Row> rowIter1 = + tEnv.executeSql("select * from multi_type_partitioned_table where year_val = 2025") + .collect(); + + List<String> expected1 = new ArrayList<>(); + expected1.addAll(expectedRowValues); // Data from (2025, '01', 15, true) + for (int i = 4; i <= 6; i++) { + expected1.add(String.format("+I[%d, name%d, 2025, 02, 20, false]", i, i)); + } + assertResultsIgnoreOrder(rowIter1, expected1, true); + + // Test 2: Filter by string partition field + String plan2 = + tEnv.explainSql( + "select * from multi_type_partitioned_table where month_val = '01'"); + assertThat(plan2) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, multi_type_partitioned_table, filter=[=(month_val, _UTF-16LE'01':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], project=[id, name, year_val, day_val, is_active]]], fields=[id, name, year_val, day_val, is_active])"); + + org.apache.flink.util.CloseableIterator<Row> rowIter2 = + tEnv.executeSql("select * from multi_type_partitioned_table where month_val = '01'") + .collect(); + + List<String> expected2 = new ArrayList<>(); + expected2.addAll(expectedRowValues); // Data from (2025, '01', 15, true) + for (int i = 7; i <= 9; i++) { + expected2.add(String.format("+I[%d, name%d, 2026, 01, 10, true]", i, i)); + } + assertResultsIgnoreOrder(rowIter2, expected2, true); + + // Test 3: Filter by bigint partition field + String plan3 = + tEnv.explainSql("select * from multi_type_partitioned_table where day_val = 15"); + assertThat(plan3) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, multi_type_partitioned_table, filter=[=(day_val, 15:BIGINT)], project=[id, name, year_val, month_val, is_active]]], fields=[id, name, year_val, month_val, is_active])"); + + org.apache.flink.util.CloseableIterator<Row> rowIter3 = + tEnv.executeSql("select * from multi_type_partitioned_table where day_val = 15") + .collect(); + assertResultsIgnoreOrder(rowIter3, expectedRowValues, true); + + // Test 4: Filter by boolean partition field + String plan4 = + tEnv.explainSql( + "select * from multi_type_partitioned_table where is_active is true"); + assertThat(plan4) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, multi_type_partitioned_table, filter=[is_active]]], fields=[id, name, year_val, month_val, day_val, is_active]"); + + org.apache.flink.util.CloseableIterator<Row> rowIter4 = + tEnv.executeSql("select * from multi_type_partitioned_table where is_active = true") + .collect(); + + List<String> expected4 = new ArrayList<>(); + expected4.addAll(expectedRowValues); // Data from (2025, '01', 15, true) + for (int i = 7; i <= 9; i++) { + expected4.add(String.format("+I[%d, name%d, 2026, 01, 10, true]", i, i)); + } + assertResultsIgnoreOrder(rowIter4, expected4, true); + + // Test 5: Complex filter with multiple partition fields + String plan5 = + tEnv.explainSql( + "select * from multi_type_partitioned_table where year_val = 2025 and month_val = '01' and is_active = true"); + assertThat(plan5) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, multi_type_partitioned_table, filter=[and(and(=(year_val, 2025), =(month_val, _UTF-16LE'01':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")), is_active)], project=[id, name, day_val, is_active]]], fields=[id, name, day_val, is_active])"); + + org.apache.flink.util.CloseableIterator<Row> rowIter5 = + tEnv.executeSql( + "select * from multi_type_partitioned_table where year_val = 2025 and month_val = '01' and is_active = true") + .collect(); + assertResultsIgnoreOrder(rowIter5, expectedRowValues, true); + } + @Test void testStreamingReadMultiPartitionPushDown() throws Exception { tEnv.executeSql( @@ -979,7 +1239,7 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { .filter(s -> s.contains("2025")) .collect(Collectors.toList()); waitUntilAllBucketFinishSnapshot( - admin, tablePath, Arrays.asList("2025$1", "2025$2", "2025$2")); + admin, tablePath, Arrays.asList("2025$1", "2025$2", "2026$1")); String plan = tEnv.explainSql("select * from multi_partitioned_table where c ='2025'"); assertThat(plan) @@ -1021,7 +1281,7 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { } @Test - void testStreamingReadWithCombinedFilters() throws Exception { + void testStreamingReadWithCombinedFilters1() throws Exception { tEnv.executeSql( "create table combined_filters_table" + " (a int not null, b varchar, c string, d int, primary key (a, c) NOT ENFORCED) partitioned by (c) "); @@ -1030,12 +1290,16 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { tEnv.executeSql("alter table combined_filters_table add partition (c=2026)"); List<InternalRow> rows = new ArrayList<>(); - List<String> expectedRowValues = new ArrayList<>(); + List<String> expectedRowValues1 = new ArrayList<>(); + List<String> expectedRowValues2 = new ArrayList<>(); for (int i = 0; i < 10; i++) { rows.add(row(i, "v" + i, "2025", i * 100)); if (i % 2 == 0) { - expectedRowValues.add(String.format("+I[%d, 2025, %d]", i, i * 100)); + expectedRowValues1.add(String.format("+I[%d, 2025, %d]", i, i * 100)); + } + if (i == 2) { + expectedRowValues2.add(String.format("+I[%d, 2025, %d]", i, i * 100)); } } writeRows(conn, tablePath, rows, false); @@ -1062,7 +1326,24 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { "select a,c,d from combined_filters_table where c ='2025' and d % 200 = 0") .collect(); - assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + assertResultsIgnoreOrder(rowIter, expectedRowValues1, true); + + plan = + tEnv.explainSql( + "select a,c,d from combined_filters_table where c ='2025' and d = 200"); + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, combined_filters_table, " + + "filter=[=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], " + + "project=[a, d]]], fields=[a, d])"); + + // test column filter、partition filter and flink runtime filter + rowIter = + tEnv.executeSql( + "select a,c,d from combined_filters_table where c ='2025' and d = 200") + .collect(); + + assertResultsIgnoreOrder(rowIter, expectedRowValues2, true); } @Test @@ -1095,8 +1376,9 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { values[1] = keyValuePairs[1].split("=")[1]; for (int i = 0; i < 10; i++) { - rows.add(row(i, "v1", values[0], values[1])); - expectedRowValues.add(String.format("+I[%d, v1, %s, %s]", i, values[0], values[1])); + rows.add(row(i, "v" + i, values[0], values[1])); + expectedRowValues.add( + String.format("+I[%d, v%d, %s, %s]", i, i, values[0], values[1])); } } @@ -1105,6 +1387,183 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { return expectedRowValues; } + @Test + void testStreamingReadPartitionPushDownWithInExpr() throws Exception { + + tEnv.executeSql( + "create table partitioned_table_in" + + " (a int not null, b varchar, c string, primary key (a, c) NOT ENFORCED) partitioned by (c) "); + TablePath tablePath = TablePath.of(DEFAULT_DB, "partitioned_table_in"); + tEnv.executeSql("alter table partitioned_table_in add partition (c=2025)"); + tEnv.executeSql("alter table partitioned_table_in add partition (c=2026)"); + tEnv.executeSql("alter table partitioned_table_in add partition (c=2027)"); + + List<String> expectedRowValues = + writeRowsToPartition(conn, tablePath, Arrays.asList("2025", "2026", "2027")) + .stream() + .filter(s -> s.contains("2025") || s.contains("2026")) + .collect(Collectors.toList()); + waitUntilAllBucketFinishSnapshot(admin, tablePath, Arrays.asList("2025", "2026", "2027")); + + String query1 = "select * from partitioned_table_in where c in ('2025','2026')"; + String plan = tEnv.explainSql(query1); + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_in, filter=[OR(=(c, _UTF-16LE'2025'), =(c, _UTF-16LE'2026'))]]], fields=[a, b, c])"); + + org.apache.flink.util.CloseableIterator<Row> rowIter = tEnv.executeSql(query1).collect(); + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + + String query2 = "select * from partitioned_table_in where c ='2025' or c ='2026'"; + plan = tEnv.explainSql(query2); + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_in, filter=[OR(=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"), =(c, _UTF-16LE'2026':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))]]], fields=[a, b, c])"); + rowIter = tEnv.executeSql(query2).collect(); + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + } + + @Test + void testStreamingReadWithCombinedFiltersAndInExpr() throws Exception { + tEnv.executeSql( + "create table combined_filters_table_in" + + " (a int not null, b varchar, c string, d int, primary key (a, c) NOT ENFORCED) partitioned by (c) "); + TablePath tablePath = TablePath.of(DEFAULT_DB, "combined_filters_table_in"); + tEnv.executeSql("alter table combined_filters_table_in add partition (c=2025)"); + tEnv.executeSql("alter table combined_filters_table_in add partition (c=2026)"); + tEnv.executeSql("alter table combined_filters_table_in add partition (c=2027)"); + + List<InternalRow> rows = new ArrayList<>(); + List<String> expectedRowValues = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + rows.add(row(i, "v" + i, "2025", i * 100)); + if (i % 2 == 0) { + expectedRowValues.add(String.format("+I[%d, 2025, %d]", i, i * 100)); + } + } + for (int i = 0; i < 10; i++) { + rows.add(row(i, "v" + i, "2026", i * 100)); + if (i % 2 == 0) { + expectedRowValues.add(String.format("+I[%d, 2026, %d]", i, i * 100)); + } + } + + for (int i = 0; i < 10; i++) { + rows.add(row(i, "v" + i, "2027", i * 100)); + } + + writeRows(conn, tablePath, rows, false); + waitUntilAllBucketFinishSnapshot(admin, tablePath, Arrays.asList("2025", "2026", "2027")); + + String query1 = + "select a,c,d from combined_filters_table_in where c in ('2025','2026') and d % 200 = 0"; + String plan = tEnv.explainSql(query1); + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, combined_filters_table_in, filter=[OR(=(c, _UTF-16LE'2025'), =(c, _UTF-16LE'2026'))], project=[a, c, d]]], fields=[a, c, d])"); + + // test column filter、partition filter and flink runtime filter + org.apache.flink.util.CloseableIterator<Row> rowIter = tEnv.executeSql(query1).collect(); + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + + rowIter = + tEnv.executeSql( + "select a,c,d from combined_filters_table_in where (c ='2025' or c ='2026') " + + "and d % 200 = 0") + .collect(); + + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + } + + @Test + void testStreamingReadPartitionPushDownWithLikeExpr() throws Exception { + + tEnv.executeSql( + "create table partitioned_table_like" + + " (a int not null, b varchar, c string, primary key (a, c) NOT ENFORCED) partitioned by (c) "); + TablePath tablePath = TablePath.of(DEFAULT_DB, "partitioned_table_like"); + tEnv.executeSql("alter table partitioned_table_like add partition (c=2025)"); + tEnv.executeSql("alter table partitioned_table_like add partition (c=2026)"); + tEnv.executeSql("alter table partitioned_table_like add partition (c=3026)"); + + List<String> allData = + writeRowsToPartition(conn, tablePath, Arrays.asList("2025", "2026", "3026")); + List<String> expectedRowValues = + allData.stream() + .filter(s -> s.contains("2025") || s.contains("2026")) + .collect(Collectors.toList()); + waitUntilAllBucketFinishSnapshot(admin, tablePath, Arrays.asList("2025", "2026", "3026")); + + String query1 = "select * from partitioned_table_like where c like '202%'"; + String plan = tEnv.explainSql(query1); + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_like, filter=[LIKE(c, _UTF-16LE'202%')]]], fields=[a, b, c])"); + + org.apache.flink.util.CloseableIterator<Row> rowIter = tEnv.executeSql(query1).collect(); + + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + expectedRowValues = + allData.stream() + .filter(s -> s.contains("2026") || s.contains("3026")) + .collect(Collectors.toList()); + String query2 = "select * from partitioned_table_like where c like '%026'"; + plan = tEnv.explainSql(query2); + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_like, filter=[LIKE(c, _UTF-16LE'%026')]]], fields=[a, b, c])"); + rowIter = tEnv.executeSql(query2).collect(); + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + + expectedRowValues = + allData.stream().filter(s -> s.contains("3026")).collect(Collectors.toList()); + String query3 = "select * from partitioned_table_like where c like '%3026%'"; + plan = tEnv.explainSql(query3); + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_like, filter=[LIKE(c, _UTF-16LE'%3026%')]]], fields=[a, b, c])"); + rowIter = tEnv.executeSql(query3).collect(); + + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + } + + @Test + void testStreamingReadPartitionComplexPushDown() throws Exception { + + tEnv.executeSql( + "create table partitioned_table_complex" + + " (a int not null, b varchar, c string,d string, primary key (a, c, d) NOT ENFORCED) partitioned by (c,d) "); + TablePath tablePath = TablePath.of(DEFAULT_DB, "partitioned_table_complex"); + tEnv.executeSql("alter table partitioned_table_complex add partition (c=2025,d=1)"); + tEnv.executeSql("alter table partitioned_table_complex add partition (c=2025,d=2)"); + tEnv.executeSql("alter table partitioned_table_complex add partition (c=2026,d=1)"); + + List<String> allData = + writeRowsToTwoPartition( + tablePath, Arrays.asList("c=2025,d=1", "c=2025,d=2", "c=2026,d=1")); + List<String> expectedRowValues = + allData.stream() + .filter(s -> s.contains("v3") && !s.contains("2025, 2")) + .collect(Collectors.toList()); + waitUntilAllBucketFinishSnapshot( + admin, tablePath, Arrays.asList("2025$1", "2025$2", "2026$1")); + + String query = + "select * from partitioned_table_complex where a = 3\n" + + " and (c in ('2026') or d like '%1%') " + + " and b like '%v3%'"; + String plan = tEnv.explainSql(query); + assertThat(plan) + .contains( + "Calc(select=[3 AS a, b, c, d], where=[((a = 3) AND LIKE(b, '%v3%'))])\n" + + "+- TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_complex, filter=[OR(=(c, _UTF-16LE'2026'), LIKE(d, _UTF-16LE'%1%'))]]], fields=[a, b, c, d])"); + + org.apache.flink.util.CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect(); + + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + } + private enum Caching { ENABLE_CACHE, DISABLE_CACHE @@ -1261,6 +1720,8 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { for (String partition : partitions) { KvSnapshots snapshots = admin.getLatestKvSnapshots(tablePath, partition).get(); + List<PartitionInfo> partitionInfos = + admin.listPartitionInfos(tablePath).get(); for (int bucketId : snapshots.getBucketIds()) { if (!snapshots.getSnapshotId(bucketId).isPresent()) { return false; diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index 7218f3891..5dc4af470 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -97,7 +97,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { OffsetsInitializer.full(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList()); + null); enumerator.start(); @@ -144,7 +144,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { OffsetsInitializer.full(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList()); + null); enumerator.start(); // register all read for (int i = 0; i < numSubtasks; i++) { @@ -215,7 +215,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { OffsetsInitializer.full(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList()); + null); enumerator.start(); @@ -261,7 +261,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { OffsetsInitializer.full(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList()); + null); enumerator.start(); @@ -297,7 +297,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { OffsetsInitializer.full(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList()); + null); enumerator.start(); @@ -357,7 +357,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { OffsetsInitializer.earliest(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList(), + null, null); enumerator.start(); @@ -401,7 +401,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { OffsetsInitializer.full(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList())) { + null)) { Map<Long, String> partitionNameByIds = waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH); enumerator.start(); @@ -516,7 +516,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { OffsetsInitializer.full(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList())) { + null)) { // test splits for same non-partitioned bucket, should assign to same task TableBucket t1 = new TableBucket(tableId, 0); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java index 2f0b47a4e..633443a08 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java @@ -257,11 +257,11 @@ public class FlinkTestBase extends AbstractTestBase { } public static List<String> writeRowsToPartition( - Connection connection, TablePath tablePath, Collection<String> partitions) + Connection connection, TablePath tablePath, Collection<Object> partitions) throws Exception { List<InternalRow> rows = new ArrayList<>(); List<String> expectedRowValues = new ArrayList<>(); - for (String partition : partitions) { + for (Object partition : partitions) { for (int i = 0; i < 10; i++) { rows.add(row(i, "v1", partition)); expectedRowValues.add(String.format("+I[%d, v1, %s]", i, partition)); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/PredicateConverterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/PredicateConverterTest.java new file mode 100644 index 000000000..13eaf5b86 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/PredicateConverterTest.java @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.utils; + +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.predicate.PredicateBuilder; +import org.apache.fluss.predicate.SimpleColStats; +import org.apache.fluss.predicate.SimpleColStatsTestUtils; +import org.apache.fluss.predicate.UnsupportedExpression; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.GenericRow; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.types.Row; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link PredicateConverter}. */ +public class PredicateConverterTest { + + private static final PredicateBuilder BUILDER = + new PredicateBuilder( + FlinkConversions.toFlussRowType( + new RowType( + Arrays.asList( + new RowType.RowField("long1", new BigIntType()), + new RowType.RowField("double1", new DoubleType()), + new RowType.RowField( + "string1", + DataTypes.STRING().getLogicalType()))))); + + private static final PredicateConverter CONVERTER = new PredicateConverter(BUILDER); + + @MethodSource("provideResolvedExpression") + @ParameterizedTest + public void testVisitAndAutoTypeInference(ResolvedExpression expression, Predicate expected) { + if (expression instanceof CallExpression) { + assertThat(CONVERTER.visit((CallExpression) expression).toString()) + .isEqualTo(expected.toString()); + } else { + assertThatThrownBy(() -> CONVERTER.visit(expression)) + .isInstanceOf(UnsupportedExpression.class); + } + } + + public static Stream<Arguments> provideResolvedExpression() { + FieldReferenceExpression longRefExpr = + new FieldReferenceExpression( + "long1", DataTypes.BIGINT(), Integer.MAX_VALUE, Integer.MAX_VALUE); + ValueLiteralExpression intLitExpr = new ValueLiteralExpression(10); + ValueLiteralExpression intLitExpr2 = new ValueLiteralExpression(20); + long longLit = 10L; + ValueLiteralExpression nullLongLitExpr = + new ValueLiteralExpression(null, DataTypes.BIGINT()); + + FieldReferenceExpression doubleRefExpr = + new FieldReferenceExpression( + "double1", DataTypes.DOUBLE(), Integer.MAX_VALUE, Integer.MAX_VALUE); + ValueLiteralExpression floatLitExpr = new ValueLiteralExpression(3.14f); + double doubleLit = 3.14d; + + FieldReferenceExpression stringRefExpr = + new FieldReferenceExpression( + "string1", DataTypes.STRING(), Integer.MAX_VALUE, Integer.MAX_VALUE); + String stringLit = "haha"; + // same type + ValueLiteralExpression stringLitExpr1 = + new ValueLiteralExpression(stringLit, DataTypes.STRING().notNull()); + // different type, char(4) + ValueLiteralExpression stringLitExpr2 = new ValueLiteralExpression(stringLit); + + return Stream.of( + Arguments.of(longRefExpr, null), + Arguments.of(intLitExpr, null), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.IS_NULL, + Collections.singletonList(longRefExpr), + DataTypes.BOOLEAN()), + BUILDER.isNull(0)), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.IS_NOT_NULL, + Collections.singletonList(doubleRefExpr), + DataTypes.BOOLEAN()), + BUILDER.isNotNull(1)), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.EQUALS, + // test literal on left + Arrays.asList(intLitExpr, longRefExpr), + DataTypes.BOOLEAN()), + BUILDER.equal(0, longLit)), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(nullLongLitExpr, longRefExpr), + DataTypes.BOOLEAN()), + BUILDER.equal(0, null)), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.NOT_EQUALS, + Arrays.asList(longRefExpr, intLitExpr), + DataTypes.BOOLEAN()), + BUILDER.notEqual(0, longLit)), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.NOT_EQUALS, + Arrays.asList(longRefExpr, nullLongLitExpr), + DataTypes.BOOLEAN()), + BUILDER.notEqual(0, null)), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.GREATER_THAN, + Arrays.asList(longRefExpr, intLitExpr), + DataTypes.BOOLEAN()), + BUILDER.greaterThan(0, longLit)), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.GREATER_THAN, + Arrays.asList(longRefExpr, nullLongLitExpr), + DataTypes.BOOLEAN()), + BUILDER.greaterThan(0, null)), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, + Arrays.asList(longRefExpr, intLitExpr), + DataTypes.BOOLEAN()), + BUILDER.greaterOrEqual(0, longLit)), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, + Arrays.asList(longRefExpr, nullLongLitExpr), + DataTypes.BOOLEAN()), + BUILDER.greaterOrEqual(0, null)), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.LESS_THAN, + Arrays.asList(longRefExpr, intLitExpr), + DataTypes.BOOLEAN()), + BUILDER.lessThan(0, longLit)), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.LESS_THAN, + Arrays.asList(longRefExpr, nullLongLitExpr), + DataTypes.BOOLEAN()), + BUILDER.lessThan(0, null)), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, + Arrays.asList(longRefExpr, intLitExpr), + DataTypes.BOOLEAN()), + BUILDER.lessOrEqual(0, longLit)), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, + Arrays.asList(longRefExpr, nullLongLitExpr), + DataTypes.BOOLEAN()), + BUILDER.lessOrEqual(0, null)), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.AND, + Arrays.asList( + CallExpression.permanent( + BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, + Arrays.asList(longRefExpr, intLitExpr), + DataTypes.BOOLEAN()), + CallExpression.permanent( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(doubleRefExpr, floatLitExpr), + DataTypes.BOOLEAN())), + DataTypes.BOOLEAN()), + PredicateBuilder.and( + BUILDER.lessOrEqual(0, longLit), BUILDER.equal(1, doubleLit))), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.OR, + Arrays.asList( + CallExpression.permanent( + BuiltInFunctionDefinitions.NOT_EQUALS, + Arrays.asList(longRefExpr, intLitExpr), + DataTypes.BOOLEAN()), + CallExpression.permanent( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(doubleRefExpr, floatLitExpr), + DataTypes.BOOLEAN())), + DataTypes.BOOLEAN()), + PredicateBuilder.or( + BUILDER.notEqual(0, longLit), BUILDER.equal(1, doubleLit))), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.IN, + Arrays.asList( + longRefExpr, intLitExpr, nullLongLitExpr, intLitExpr2), + DataTypes.BOOLEAN()), + BUILDER.in(0, Arrays.asList(10L, null, 20L))), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(stringLitExpr1, stringRefExpr), + DataTypes.STRING()), + BUILDER.equal(2, BinaryString.fromString("haha"))), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(stringLitExpr2, stringRefExpr), + DataTypes.STRING()), + BUILDER.equal(2, BinaryString.fromString("haha"))), + Arguments.of( + CallExpression.permanent( + BuiltInFunctionDefinitions.BETWEEN, + Arrays.asList(longRefExpr, intLitExpr, intLitExpr2), + DataTypes.BOOLEAN()), + BUILDER.between(0, 10, 20))); + } + + @MethodSource("provideLikeExpressions") + @ParameterizedTest + public void testStartsWith( + CallExpression callExpression, + List<Object[]> valuesList, + List<Boolean> expectedForValues, + List<Long> rowCountList, + List<SimpleColStats[]> statsList, + List<Boolean> expectedForStats) { + + Predicate predicate = + callExpression.accept( + new PredicateConverter(RowType.of(new VarCharType(Integer.MAX_VALUE)))); + IntStream.range(0, valuesList.size()) + .forEach( + i -> + assertThat(predicate.test(GenericRow.of(valuesList.get(i)))) + .isEqualTo(expectedForValues.get(i))); + IntStream.range(0, rowCountList.size()) + .forEach( + i -> + assertThat( + SimpleColStatsTestUtils.test( + predicate, + rowCountList.get(i), + statsList.get(i))) + .isEqualTo(expectedForStats.get(i))); + } + + public static Stream<Arguments> provideLikeExpressions() { + CallExpression expr1 = + call( + BuiltInFunctionDefinitions.LIKE, + field(0, STRING()), + literal("abd%", STRING())); + List<Object[]> valuesList1 = + Arrays.asList( + new Object[] {null}, + new Object[] {BinaryString.fromString("a")}, + new Object[] {BinaryString.fromString("ab")}, + new Object[] {BinaryString.fromString("abd")}, + new Object[] {BinaryString.fromString("abd%")}, + new Object[] {BinaryString.fromString("abd1")}, + new Object[] {BinaryString.fromString("abde@")}, + new Object[] {BinaryString.fromString("abd_")}, + new Object[] {BinaryString.fromString("abd_%")}); + List<Boolean> expectedForValues1 = + Arrays.asList(false, false, false, true, true, true, true, true, true); + List<Long> rowCountList1 = Arrays.asList(0L, 3L, 3L, 3L); + List<SimpleColStats[]> statsList1 = + Arrays.asList( + new SimpleColStats[] {new SimpleColStats(null, null, 0L)}, + new SimpleColStats[] {new SimpleColStats(null, null, 3L)}, + new SimpleColStats[] { + new SimpleColStats( + BinaryString.fromString("ab"), + BinaryString.fromString("abc123"), + 1L) + }, + new SimpleColStats[] { + new SimpleColStats( + BinaryString.fromString("abc"), + BinaryString.fromString("abe"), + 1L) + }); + List<Boolean> expectedForStats1 = Arrays.asList(false, false, false, true); + + // currently, SQL wildcards '[]' and '[^]' are deemed as normal characters in Flink + CallExpression expr3 = + call( + BuiltInFunctionDefinitions.LIKE, + field(0, STRING()), + literal("[a-c]xyz%", STRING())); + List<Object[]> valuesList3 = + Arrays.asList( + new Object[] {BinaryString.fromString("axyz")}, + new Object[] {BinaryString.fromString("bxyz")}, + new Object[] {BinaryString.fromString("cxyz")}, + new Object[] {BinaryString.fromString("[a-c]xyz")}); + List<Boolean> expectedForValues3 = Arrays.asList(false, false, false, true); + List<Long> rowCountList3 = Collections.singletonList(3L); + List<SimpleColStats[]> statsList3 = + Collections.singletonList( + new SimpleColStats[] { + new SimpleColStats( + BinaryString.fromString("[a-c]xyz"), + BinaryString.fromString("[a-c]xyzz"), + 0L) + }); + List<Boolean> expectedForStats3 = Collections.singletonList(true); + + CallExpression expr4 = + call( + BuiltInFunctionDefinitions.LIKE, + field(0, STRING()), + literal("[^a-d]xyz%", STRING())); + List<Object[]> valuesList4 = + Arrays.asList( + new Object[] {BinaryString.fromString("exyz")}, + new Object[] {BinaryString.fromString("fxyz")}, + new Object[] {BinaryString.fromString("axyz")}, + new Object[] {BinaryString.fromString("[^a-d]xyz")}); + List<Boolean> expectedForValues4 = Arrays.asList(false, false, false, true); + List<Long> rowCountList4 = Collections.singletonList(3L); + List<SimpleColStats[]> statsList4 = + Collections.singletonList( + new SimpleColStats[] { + new SimpleColStats( + BinaryString.fromString("[^a-d]xyz"), + BinaryString.fromString("[^a-d]xyzz"), + 1L) + }); + List<Boolean> expectedForStats4 = Collections.singletonList(true); + + return Stream.of( + Arguments.of( + expr1, + valuesList1, + expectedForValues1, + rowCountList1, + statsList1, + expectedForStats1), + Arguments.of( + expr3, + valuesList3, + expectedForValues3, + rowCountList3, + statsList3, + expectedForStats3), + Arguments.of( + expr4, + valuesList4, + expectedForValues4, + rowCountList4, + statsList4, + expectedForStats4)); + } + + @Test + public void testUnsupportedExpression() { + CallExpression expression = + call( + BuiltInFunctionDefinitions.AND, + call( + BuiltInFunctionDefinitions.EQUALS, + field(0, DataTypes.INT()), + literal(3)), + call( + BuiltInFunctionDefinitions.SIMILAR, + field(1, DataTypes.INT()), + literal(5))); + assertThatThrownBy( + () -> + expression.accept( + new PredicateConverter( + RowType.of(new IntType(), new IntType())))) + .isInstanceOf(UnsupportedExpression.class); + } + + @Test + public void testUnsupportedStartsPatternForLike() { + PredicateConverter converter = + new PredicateConverter(RowType.of(new VarCharType(Integer.MAX_VALUE))); + // starts pattern with '_' as wildcard + assertThatThrownBy( + () -> + call( + BuiltInFunctionDefinitions.LIKE, + field(0, STRING()), + literal("abc_", STRING())) + .accept(converter)) + .isInstanceOf(UnsupportedExpression.class); + + // starts pattern like 'abc%xyz' or 'abc_xyz' + assertThatThrownBy( + () -> + call( + BuiltInFunctionDefinitions.LIKE, + field(0, STRING()), + literal("abc%xyz", STRING())) + .accept(converter)) + .isInstanceOf(UnsupportedExpression.class); + assertThatThrownBy( + () -> + call( + BuiltInFunctionDefinitions.LIKE, + field(0, STRING()), + literal("abc_xyz", STRING())) + .accept(converter)) + .isInstanceOf(UnsupportedExpression.class); + + // starts pattern like 'abc%xyz' or 'abc_xyz' with '%' or '_' to escape + assertThatThrownBy( + () -> + call( + BuiltInFunctionDefinitions.LIKE, + field(0, STRING()), + literal( + "=%abc=%%xyz=_", + STRING()), // matches "%abc%(?s:.*)xyz_" + literal("=", STRING())) + .accept(converter)) + .isInstanceOf(UnsupportedExpression.class); + assertThatThrownBy( + () -> + call( + BuiltInFunctionDefinitions.LIKE, + field(0, STRING()), + literal( + "abc=%%xyz", + STRING()), // matches "abc%(?s:.*)xyz" + literal("=", STRING())) + .accept(converter)) + .isInstanceOf(UnsupportedExpression.class); + assertThatThrownBy( + () -> + call( + BuiltInFunctionDefinitions.LIKE, + field(0, STRING()), + literal( + "abc=%_xyz", + STRING()), // matches "abc%.xyz" + literal("=", STRING())) + .accept(converter)) + .isInstanceOf(UnsupportedExpression.class); + assertThatThrownBy( + () -> + call( + BuiltInFunctionDefinitions.LIKE, + field(0, STRING()), + literal( + "abc=_%xyz", + STRING()), // matches "abc_(?s:.*)xyz" + literal("=", STRING())) + .accept(converter)) + .isInstanceOf(UnsupportedExpression.class); + assertThatThrownBy( + () -> + call( + BuiltInFunctionDefinitions.LIKE, + field(0, STRING()), + literal( + "abc=__xyz", + STRING()), // matches "abc_.xyz" + literal("=", STRING())) + .accept(converter)) + .isInstanceOf(UnsupportedExpression.class); + + // starts pattern with wildcard '%' at the beginning to escape + assertThatThrownBy( + () -> + call( + BuiltInFunctionDefinitions.LIKE, + field(0, STRING()), + literal("=%%", STRING()), // matches "%(?s:.*)" + literal("=", STRING())) + .accept(converter)) + .isInstanceOf(UnsupportedExpression.class); + } + + @Test + public void testUnsupportedEqualsPatternForLike() { + PredicateConverter converter = + new PredicateConverter(RowType.of(new VarCharType(Integer.MAX_VALUE))); + // equals pattern + assertThatThrownBy( + () -> + call( + BuiltInFunctionDefinitions.LIKE, + field(0, STRING()), + literal("123456", STRING())) + .accept(converter)) + .isInstanceOf(UnsupportedExpression.class); + + // equals pattern escape '%' + assertThatThrownBy( + () -> + call( + BuiltInFunctionDefinitions.LIKE, + field(0, STRING()), + literal("12=%45", STRING()), // equals "12%45" + literal("=", STRING())) + .accept(converter)) + .isInstanceOf(UnsupportedExpression.class); + + // equals pattern escape '_' + assertThatThrownBy( + () -> + call( + BuiltInFunctionDefinitions.LIKE, + field(0, STRING()), + literal("12=_45", STRING()), // equals "12_45" + literal("=", STRING())) + .accept(converter)) + .isInstanceOf(UnsupportedExpression.class); + } + + @Test + public void testUnsupportedType() { + PredicateConverter converter = + new PredicateConverter(RowType.of(new VarCharType(Integer.MAX_VALUE))); + DataType structType = DataTypes.ROW(DataTypes.INT()).bridgedTo(Row.class); + CallExpression expression = + call( + BuiltInFunctionDefinitions.EQUALS, + field(0, structType), + literal(Row.of(1), structType)); + assertThatThrownBy(() -> expression.accept(converter)) + .isInstanceOf(UnsupportedExpression.class); + } + + @Test + public void testUnsupportedFieldReferenceExpression() { + PredicateConverter converter = + new PredicateConverter(RowType.of(new VarCharType(Integer.MAX_VALUE))); + DataType structType = DataTypes.ROW(DataTypes.INT()).bridgedTo(Row.class); + assertThatThrownBy(() -> field(0, structType).accept(converter)) + .isInstanceOf(UnsupportedExpression.class); + } + + private static FieldReferenceExpression field(int i, DataType type) { + return new FieldReferenceExpression("f" + i, type, Integer.MAX_VALUE, Integer.MAX_VALUE); + } + + private static CallExpression call(FunctionDefinition function, ResolvedExpression... args) { + return new CallExpression(false, null, function, Arrays.asList(args), DataTypes.BOOLEAN()); + } +} diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index c4fa15e05..b79232fad 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -394,7 +394,6 @@ <exclude>org.apache.fluss.flink.tiering.source.TieringSourceOptions</exclude> <exclude>org.apache.fluss.flink.tiering.source.TieringSource.Builder</exclude> <exclude>org.apache.fluss.flink.tiering.source.TieringSource</exclude> - <exclude>org.apache.fluss.flink.tiering.event.TieringRestoreEvent</exclude> <exclude> org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator </exclude>
