This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 680ca7e69f17ca3cf6a041ff53a5170266767219 Author: Jark Wu <[email protected]> AuthorDate: Tue Sep 23 00:16:02 2025 +0800 [flink] Improve the implementation for general predicates partition pushdown and fix bugs --- .../java/org/apache/fluss/predicate/Contains.java | 4 +- .../java/org/apache/fluss/predicate/EndsWith.java | 5 +- .../org/apache/fluss/predicate/LeafPredicate.java | 9 - .../fluss/predicate/PartitionPredicateVisitor.java | 5 +- .../org/apache/fluss/predicate/StartsWith.java | 5 +- .../org/apache/fluss/predicate/PredicateTest.java | 6 +- .../flink/source/Flink118TableSourceITCase.java | 11 +- fluss-flink/fluss-flink-common/pom.xml | 16 -- .../apache/fluss/flink/row/FlinkAsFlussRow.java | 11 - .../fluss/flink/source/FlinkTableSource.java | 91 ++----- .../fluss/flink/utils/PredicateConverter.java | 82 +++--- .../flink/utils/StringifyPredicateVisitor.java | 72 +++++ .../fluss/flink/source/FlinkTableSourceITCase.java | 300 ++++----------------- .../apache/fluss/flink/utils/FlinkTestBase.java | 2 +- .../fluss/flink/utils/PredicateConverterTest.java | 14 +- fluss-test-coverage/pom.xml | 1 + website/docs/engine-flink/reads.md | 21 +- website/src/css/custom.css | 12 +- 18 files changed, 244 insertions(+), 423 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/predicate/Contains.java b/fluss-common/src/main/java/org/apache/fluss/predicate/Contains.java index 446a1490c..b50e575dc 100644 --- a/fluss-common/src/main/java/org/apache/fluss/predicate/Contains.java +++ b/fluss-common/src/main/java/org/apache/fluss/predicate/Contains.java @@ -17,6 +17,7 @@ package org.apache.fluss.predicate; +import org.apache.fluss.row.BinaryString; import org.apache.fluss.types.DataType; import java.util.List; @@ -37,8 +38,7 @@ public class Contains extends NullFalseLeafBinaryFunction { @Override public boolean test(DataType type, Object field, Object patternLiteral) { - String fieldString = field.toString(); - return fieldString.contains((String) patternLiteral); + return ((BinaryString) field).contains((BinaryString) patternLiteral); } @Override diff --git a/fluss-common/src/main/java/org/apache/fluss/predicate/EndsWith.java b/fluss-common/src/main/java/org/apache/fluss/predicate/EndsWith.java index 654cecd3f..8f3d423a2 100644 --- a/fluss-common/src/main/java/org/apache/fluss/predicate/EndsWith.java +++ b/fluss-common/src/main/java/org/apache/fluss/predicate/EndsWith.java @@ -17,6 +17,7 @@ package org.apache.fluss.predicate; +import org.apache.fluss.row.BinaryString; import org.apache.fluss.types.DataType; import java.util.List; @@ -40,8 +41,8 @@ public class EndsWith extends NullFalseLeafBinaryFunction { @Override public boolean test(DataType type, Object field, Object patternLiteral) { - String fieldString = field.toString(); - return fieldString.endsWith((String) patternLiteral); + BinaryString fieldString = (BinaryString) field; + return fieldString.endsWith((BinaryString) patternLiteral); } @Override diff --git a/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java b/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java index 640c5093b..17528f74e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java +++ b/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java @@ -19,7 +19,6 @@ package org.apache.fluss.predicate; import org.apache.fluss.row.InternalRow; import org.apache.fluss.types.DataType; -import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.DecimalType; import org.apache.fluss.types.LocalZonedTimestampType; import org.apache.fluss.types.TimestampType; @@ -81,14 +80,6 @@ public class LeafPredicate implements Predicate { return literals; } - public LeafPredicate copyWithNewIndex(int fieldIndex) { - return new LeafPredicate(function, type, fieldIndex, fieldName, literals); - } - - public LeafPredicate copyWithNewLiterals(List<Object> literals) { - return new LeafPredicate(function, DataTypes.STRING(), fieldIndex, fieldName, literals); - } - @Override public boolean test(InternalRow row) { return function.test(type, get(row, fieldIndex, type), literals); diff --git a/fluss-common/src/main/java/org/apache/fluss/predicate/PartitionPredicateVisitor.java b/fluss-common/src/main/java/org/apache/fluss/predicate/PartitionPredicateVisitor.java index 284451b74..47957a058 100644 --- a/fluss-common/src/main/java/org/apache/fluss/predicate/PartitionPredicateVisitor.java +++ b/fluss-common/src/main/java/org/apache/fluss/predicate/PartitionPredicateVisitor.java @@ -23,7 +23,10 @@ import java.util.List; * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for * additional information regarding copyright ownership. */ -/** Visit the predicate and check if it only contains partition key's predicate. */ +/** + * Visit the predicate and check if it only contains partition key's predicate. Returns false if it + * contains any predicates that filters on non-partition fields. + */ public class PartitionPredicateVisitor implements PredicateVisitor<Boolean> { private final List<String> partitionKeys; diff --git a/fluss-common/src/main/java/org/apache/fluss/predicate/StartsWith.java b/fluss-common/src/main/java/org/apache/fluss/predicate/StartsWith.java index ce894c69d..c79ac590c 100644 --- a/fluss-common/src/main/java/org/apache/fluss/predicate/StartsWith.java +++ b/fluss-common/src/main/java/org/apache/fluss/predicate/StartsWith.java @@ -17,6 +17,7 @@ package org.apache.fluss.predicate; +import org.apache.fluss.row.BinaryString; import org.apache.fluss.types.DataType; import java.util.List; @@ -40,8 +41,8 @@ public class StartsWith extends NullFalseLeafBinaryFunction { @Override public boolean test(DataType type, Object field, Object patternLiteral) { - String fieldString = field.toString(); - return fieldString.startsWith((String) patternLiteral); + BinaryString fieldString = (BinaryString) field; + return fieldString.startsWith((BinaryString) patternLiteral); } @Override diff --git a/fluss-common/src/test/java/org/apache/fluss/predicate/PredicateTest.java b/fluss-common/src/test/java/org/apache/fluss/predicate/PredicateTest.java index 5899a4425..f6284eab9 100644 --- a/fluss-common/src/test/java/org/apache/fluss/predicate/PredicateTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/predicate/PredicateTest.java @@ -325,7 +325,7 @@ public class PredicateTest { @Test public void testEndsWith() { PredicateBuilder builder = new PredicateBuilder(RowType.of(new StringType())); - Predicate predicate = builder.endsWith(0, ("bcc")); + Predicate predicate = builder.endsWith(0, fromString("bcc")); GenericRow row = GenericRow.of(fromString("aabbcc")); assertThat(predicate.test(row)).isEqualTo(true); @@ -339,7 +339,7 @@ public class PredicateTest { @Test public void testStartWith() { PredicateBuilder builder = new PredicateBuilder(RowType.of(new StringType())); - Predicate predicate = builder.startsWith(0, ("aab")); + Predicate predicate = builder.startsWith(0, fromString("aab")); GenericRow row = GenericRow.of(fromString("aabbcc")); assertThat(predicate.test(row)).isEqualTo(true); @@ -357,7 +357,7 @@ public class PredicateTest { @Test public void testContainsWith() { PredicateBuilder builder = new PredicateBuilder(RowType.of(new StringType())); - Predicate predicate = builder.contains(0, ("def")); + Predicate predicate = builder.contains(0, fromString("def")); GenericRow row1 = GenericRow.of(fromString("aabbdefcc")); GenericRow row2 = GenericRow.of(fromString("aabbdcefcc")); assertThat(predicate.test(row1)).isEqualTo(true); diff --git a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118TableSourceITCase.java b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118TableSourceITCase.java index 195e87b04..0027a6ff8 100644 --- a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118TableSourceITCase.java +++ b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118TableSourceITCase.java @@ -17,5 +17,14 @@ package org.apache.fluss.flink.source; +import org.junit.jupiter.api.Disabled; + /** IT case for {@link FlinkTableSource} in Flink 1.18. */ -public class Flink118TableSourceITCase extends FlinkTableSourceITCase {} +class Flink118TableSourceITCase extends FlinkTableSourceITCase { + + @Disabled("Flink 1.18 has a bug in timestamp_ltz type") + @Override + void testStreamingReadAllPartitionTypePushDown() throws Exception { + super.testStreamingReadAllPartitionTypePushDown(); + } +} diff --git a/fluss-flink/fluss-flink-common/pom.xml b/fluss-flink/fluss-flink-common/pom.xml index aa22e2373..c7d59a7f9 100644 --- a/fluss-flink/fluss-flink-common/pom.xml +++ b/fluss-flink/fluss-flink-common/pom.xml @@ -34,7 +34,6 @@ <properties> <flink.major.version>1.20</flink.major.version> <flink.minor.version>1.20.1</flink.minor.version> - <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> @@ -127,21 +126,6 @@ <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${scala.binary.version}</artifactId> - <version>${flink.minor.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${scala.binary.version}</artifactId> - <version>${flink.minor.version}</version> - <scope>test</scope> - </dependency> - <dependency> <groupId>org.apache.fluss</groupId> <artifactId>fluss-server</artifactId> diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java index 3593d1f10..4b48a4ef4 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java @@ -17,7 +17,6 @@ package org.apache.fluss.flink.row; -import org.apache.fluss.flink.utils.FlinkConversions; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; import org.apache.fluss.row.InternalRow; @@ -25,10 +24,8 @@ import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.TimestampData; -import org.apache.flink.table.types.DataType; /** Wraps a Flink {@link RowData} as a Fluss {@link InternalRow}. */ public class FlinkAsFlussRow implements InternalRow { @@ -135,12 +132,4 @@ public class FlinkAsFlussRow implements InternalRow { public byte[] getBytes(int pos) { return flinkRow.getBinary(pos); } - - public static Object fromFlinkObject(Object o, DataType type) { - if (o == null) { - return null; - } - return InternalRow.createFieldGetter(FlinkConversions.toFlussType(type), 0) - .getFieldOrNull((new FlinkAsFlussRow()).replace(GenericRowData.of(o))); - } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 3c4464724..c278e1572 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -26,7 +26,6 @@ import org.apache.fluss.flink.source.lookup.FlinkLookupFunction; import org.apache.fluss.flink.source.lookup.LookupNormalizer; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; import org.apache.fluss.flink.utils.FlinkConversions; -import org.apache.fluss.flink.utils.PredicateConverter; import org.apache.fluss.flink.utils.PushdownUtils; import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual; import org.apache.fluss.lake.source.LakeSource; @@ -39,11 +38,9 @@ import org.apache.fluss.predicate.PartitionPredicateVisitor; import org.apache.fluss.predicate.Predicate; import org.apache.fluss.predicate.PredicateBuilder; import org.apache.fluss.predicate.PredicateVisitor; -import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.RowType; -import org.apache.fluss.utils.PartitionUtils; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -91,11 +88,12 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import static org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource; +import static org.apache.fluss.flink.utils.PredicateConverter.convertToFlussPredicate; import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE; import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals; +import static org.apache.fluss.flink.utils.StringifyPredicateVisitor.stringifyPartitionPredicate; import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; import static org.apache.fluss.utils.Preconditions.checkNotNull; import static org.apache.fluss.utils.Preconditions.checkState; @@ -506,43 +504,32 @@ public class FlinkTableSource } singleRowFilter = lookupRow; return Result.of(acceptedFilters, remainingFilters); - } else if (isPartitioned() - && !RowLevelModificationType.UPDATE.equals(modificationScanType)) { + } else if (isPartitioned()) { // apply partition filter pushdown List<Predicate> converted = new ArrayList<>(); - List<String> fieldNames = tableOutputType.getFieldNames(); - List<String> partitionKeys = - Arrays.stream(partitionKeyIndexes) - .mapToObj(fieldNames::get) - .collect(Collectors.toList()); + RowType partitionRowType = + FlinkConversions.toFlussRowType(tableOutputType).project(partitionKeyIndexes); + PredicateVisitor<Boolean> checksOnlyPartitionKeys = + new PartitionPredicateVisitor(partitionRowType.getFieldNames()); - PredicateVisitor<Boolean> partitionPredicateVisitor = - new PartitionPredicateVisitor(partitionKeys); - - LogicalType[] partitionKeyTypes = - Arrays.stream(partitionKeyIndexes) - .mapToObj(producedDataType.getChildren()::get) - .toArray(LogicalType[]::new); for (ResolvedExpression filter : filters) { Optional<Predicate> predicateOptional = - PredicateConverter.convert( - org.apache.flink.table.types.logical.RowType.of( - partitionKeyTypes, partitionKeys.toArray(new String[0])), - filter); + convertToFlussPredicate(partitionRowType, filter); if (predicateOptional.isPresent()) { Predicate p = predicateOptional.get(); - if (!p.visit(partitionPredicateVisitor)) { + // partition pushdown can only guarantee to filter out partitions matches the + // predicate, but can't guarantee to filter out all data matches to + // non-partition filter in the partition + if (!p.visit(checksOnlyPartitionKeys)) { remainingFilters.add(filter); } else { acceptedFilters.add(filter); } - // Convert literals in the predicate to string using - // PartitionUtils.convertValueOfType - p = stringifyPredicate(p); - converted.add(p); + // Convert literals in the predicate to partition string + converted.add(stringifyPartitionPredicate(p)); } else { remainingFilters.add(filter); } @@ -553,7 +540,7 @@ public class FlinkTableSource List<Predicate> lakePredicates = new ArrayList<>(); for (ResolvedExpression filter : filters) { Optional<Predicate> predicateOptional = - PredicateConverter.convert(tableOutputType, filter); + convertToFlussPredicate(tableOutputType, filter); predicateOptional.ifPresent(lakePredicates::add); } @@ -631,24 +618,6 @@ public class FlinkTableSource return pkTypes; } - private Map<Integer, LogicalType> getPartitionKeyTypes() { - Map<Integer, LogicalType> partitionKeyTypes = new HashMap<>(); - for (int index : partitionKeyIndexes) { - partitionKeyTypes.put(index, tableOutputType.getTypeAt(index)); - } - return partitionKeyTypes; - } - - private List<FieldEqual> stringifyFieldEquals(List<FieldEqual> fieldEquals) { - List<FieldEqual> serialize = new ArrayList<>(); - for (FieldEqual fieldEqual : fieldEquals) { - // revisit this again when we support more data types for partition key - serialize.add( - new FieldEqual(fieldEqual.fieldIndex, (fieldEqual.equalValue).toString())); - } - return serialize; - } - // projection from pk_field_index to index_in_pk private int[] getKeyRowProjection() { int[] projection = new int[tableOutputType.getFieldCount()]; @@ -673,34 +642,4 @@ public class FlinkTableSource public int[] getBucketKeyIndexes() { return bucketKeyIndexes; } - - @VisibleForTesting - public int[] getPartitionKeyIndexes() { - return partitionKeyIndexes; - } - - /** - * Converts literals in LeafPredicate to string representation using - * PartitionUtils.convertValueOfType. This is necessary because partition metadata is stored as - * string. - */ - private Predicate stringifyPredicate(Predicate predicate) { - if (predicate instanceof LeafPredicate) { - // Convert literals to string using PartitionUtils.convertValueOfType - List<Object> convertedLiterals = new ArrayList<>(); - for (Object literal : ((LeafPredicate) predicate).literals()) { - if (literal != null) { - String stringValue = - PartitionUtils.convertValueOfType( - literal, ((LeafPredicate) predicate).type().getTypeRoot()); - convertedLiterals.add(BinaryString.fromString(stringValue)); - } else { - convertedLiterals.add(null); - } - } - return ((LeafPredicate) predicate).copyWithNewLiterals(convertedLiterals); - } - - return predicate; - } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java index 1a7939a14..861381c14 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java @@ -17,12 +17,16 @@ package org.apache.fluss.flink.utils; +import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.flink.row.FlinkAsFlussRow; import org.apache.fluss.predicate.Predicate; import org.apache.fluss.predicate.PredicateBuilder; import org.apache.fluss.predicate.UnsupportedExpression; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.InternalRow; import org.apache.fluss.utils.TypeUtils; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.conversion.DataStructureConverters; import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.Expression; @@ -34,7 +38,6 @@ import org.apache.flink.table.expressions.ValueLiteralExpression; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.apache.flink.table.types.logical.RowType; @@ -62,14 +65,15 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> { private final PredicateBuilder builder; - public PredicateConverter(RowType type) { - this(new PredicateBuilder(FlinkConversions.toFlussRowType(type))); - } - public PredicateConverter(PredicateBuilder builder) { this.builder = builder; } + @VisibleForTesting + PredicateConverter(RowType type) { + this(new PredicateBuilder(FlinkConversions.toFlussRowType(type))); + } + /** Accepts simple LIKE patterns like "abc%". */ private static final Pattern BEGIN_PATTERN = Pattern.compile("^[^%_]+%$"); @@ -85,8 +89,6 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> { return PredicateBuilder.and(children.get(0).accept(this), children.get(1).accept(this)); } else if (func == BuiltInFunctionDefinitions.OR) { return PredicateBuilder.or(children.get(0).accept(this), children.get(1).accept(this)); - } else if (func == BuiltInFunctionDefinitions.NOT) { - return visitNotFunction(children, builder::equal, builder::equal); } else if (func == BuiltInFunctionDefinitions.EQUALS) { return visitBiFunction(children, builder::equal, builder::equal); } else if (func == BuiltInFunctionDefinitions.NOT_EQUALS) { @@ -119,6 +121,12 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> { .map(builder::indexOf) .map(builder::isNotNull) .orElseThrow(UnsupportedExpression::new); + } else if (func == BuiltInFunctionDefinitions.NOT) { + return extractFieldReference(children.get(0)) + .map(FieldReferenceExpression::getName) + .map(builder::indexOf) + .map(idx -> builder.equal(idx, Boolean.FALSE)) + .orElseThrow(UnsupportedExpression::new); } else if (func == BuiltInFunctionDefinitions.BETWEEN) { FieldReferenceExpression fieldRefExpr = extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new); @@ -151,22 +159,28 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> { if (escape == null) { if (BEGIN_PATTERN.matcher(sqlPattern).matches()) { String prefix = sqlPattern.substring(0, sqlPattern.length() - 1); - return builder.startsWith(builder.indexOf(fieldRefExpr.getName()), prefix); + return builder.startsWith( + builder.indexOf(fieldRefExpr.getName()), + BinaryString.fromString(prefix)); } if (END_PATTERN.matcher(sqlPattern).matches()) { String suffix = sqlPattern.substring(1); - return builder.endsWith(builder.indexOf(fieldRefExpr.getName()), suffix); + return builder.endsWith( + builder.indexOf(fieldRefExpr.getName()), + BinaryString.fromString(suffix)); } if (CONTAINS_PATTERN.matcher(sqlPattern).matches() && sqlPattern.indexOf('%', 1) == sqlPattern.length() - 1) { String mid = sqlPattern.substring(1, sqlPattern.length() - 1); - return builder.contains(builder.indexOf(fieldRefExpr.getName()), mid); + return builder.contains( + builder.indexOf(fieldRefExpr.getName()), + BinaryString.fromString(mid)); } } } } - // TODO is_xxx, between_xxx, similar, in, not_in, not? + // TODO is_true, is_false, between_xxx, similar, not_in? throw new UnsupportedExpression(); } @@ -192,24 +206,6 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> { throw new UnsupportedExpression(); } - private Predicate visitNotFunction( - List<Expression> children, - BiFunction<Integer, Object, Predicate> visit1, - BiFunction<Integer, Object, Predicate> visit2) { - Optional<FieldReferenceExpression> fieldRefExpr = extractFieldReference(children.get(0)); - if (fieldRefExpr.isPresent() && builder.indexOf(fieldRefExpr.get().getName()) != -1) { - - return visit1.apply(builder.indexOf(fieldRefExpr.get().getName()), false); - } else { - fieldRefExpr = extractFieldReference(children.get(1)); - if (fieldRefExpr.isPresent()) { - return visit2.apply(builder.indexOf(fieldRefExpr.get().getName()), false); - } - } - - throw new UnsupportedExpression(); - } - private Optional<FieldReferenceExpression> extractFieldReference(Expression expression) { if (expression instanceof FieldReferenceExpression) { return Optional.of((FieldReferenceExpression) expression); @@ -235,7 +231,7 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> { if (valueOpt.isPresent()) { Object value = valueOpt.get(); if (actualLogicalType.getTypeRoot().equals(expectedLogicalType.getTypeRoot())) { - return FlinkAsFlussRow.fromFlinkObject( + return fromFlinkObject( DataStructureConverters.getConverter(expectedType) .toInternalOrNull(value), expectedType); @@ -252,14 +248,12 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> { throw new UnsupportedExpression(); } - private boolean isStringType(LogicalType type) { - switch (type.getTypeRoot()) { - case CHAR: - case VARCHAR: - return true; - default: - return false; + private static Object fromFlinkObject(Object o, DataType type) { + if (o == null) { + return null; } + return InternalRow.createFieldGetter(FlinkConversions.toFlussType(type), 0) + .getFieldOrNull((new FlinkAsFlussRow()).replace(GenericRowData.of(o))); } private boolean supportsPredicate(LogicalType type) { @@ -296,9 +290,6 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> { @Override public Predicate visit(FieldReferenceExpression fieldReferenceExpression) { - if (fieldReferenceExpression.getOutputDataType().getLogicalType() instanceof BooleanType) { - return builder.equal(builder.indexOf(fieldReferenceExpression.getName()), true); - } throw new UnsupportedExpression(); } @@ -318,11 +309,18 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> { * @param filter a resolved expression * @return {@link Predicate} if no {@link UnsupportedExpression} thrown. */ - public static Optional<Predicate> convert(RowType rowType, ResolvedExpression filter) { + public static Optional<Predicate> convertToFlussPredicate( + org.apache.fluss.types.RowType rowType, ResolvedExpression filter) { try { - return Optional.ofNullable(filter.accept(new PredicateConverter(rowType))); + return Optional.ofNullable( + filter.accept(new PredicateConverter(new PredicateBuilder(rowType)))); } catch (UnsupportedExpression e) { return Optional.empty(); } } + + public static Optional<Predicate> convertToFlussPredicate( + RowType rowType, ResolvedExpression filter) { + return convertToFlussPredicate(FlinkConversions.toFlussRowType(rowType), filter); + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/StringifyPredicateVisitor.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/StringifyPredicateVisitor.java new file mode 100644 index 000000000..1d0f39457 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/StringifyPredicateVisitor.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.utils; + +import org.apache.fluss.predicate.CompoundPredicate; +import org.apache.fluss.predicate.LeafPredicate; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.predicate.PredicateVisitor; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.types.DataTypeRoot; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.utils.PartitionUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * A {@link PredicateVisitor} that converts all literals in {@link LeafPredicate} to string type in + * the string format of {@link PartitionUtils#convertValueOfType(Object, DataTypeRoot)}. This is + * necessary because partition metadata is stored as string. + */ +public class StringifyPredicateVisitor implements PredicateVisitor<Predicate> { + + public static Predicate stringifyPartitionPredicate(Predicate predicate) { + StringifyPredicateVisitor visitor = new StringifyPredicateVisitor(); + return predicate.visit(visitor); + } + + @Override + public Predicate visit(LeafPredicate predicate) { + List<Object> convertedLiterals = new ArrayList<>(); + for (Object literal : predicate.literals()) { + if (literal != null) { + String stringValue = + PartitionUtils.convertValueOfType(literal, predicate.type().getTypeRoot()); + convertedLiterals.add(BinaryString.fromString(stringValue)); + } else { + convertedLiterals.add(null); + } + } + return new LeafPredicate( + predicate.function(), + DataTypes.STRING(), + predicate.index(), + predicate.fieldName(), + convertedLiterals); + } + + @Override + public Predicate visit(CompoundPredicate predicate) { + List<Predicate> newChildren = new ArrayList<>(); + for (Predicate child : predicate.children()) { + newChildren.add(child.visit(this)); + } + return new CompoundPredicate(predicate.function(), newChildren); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java index 03bdc836c..f9577af97 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java @@ -25,12 +25,9 @@ import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.writer.UpsertWriter; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; -import org.apache.fluss.metadata.PartitionInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; -import org.apache.fluss.row.TimestampLtz; -import org.apache.fluss.row.TimestampNtz; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.utils.clock.ManualClock; @@ -607,8 +604,7 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { } List<String> expectedRowValues = - writeRowsToPartition( - conn, tablePath, Collections.singleton(partitionNameById.values())); + writeRowsToPartition(conn, tablePath, partitionNameById.values()); waitUntilAllBucketFinishSnapshot(admin, tablePath, partitionNameById.values()); org.apache.flink.util.CloseableIterator<Row> rowIter = @@ -966,261 +962,63 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { } @Test - void testStreamingReadDatePartitionTypePushDown() throws Exception { + void testStreamingReadAllPartitionTypePushDown() throws Exception { tEnv.executeSql( - "create table partitioned_table_long" - + " (a int not null, b varchar, c date, primary key (a, c) NOT ENFORCED) partitioned by (c) "); - TablePath tablePath = TablePath.of(DEFAULT_DB, "partitioned_table_long"); - tEnv.executeSql("alter table partitioned_table_long add partition (c=4)"); - tEnv.executeSql("alter table partitioned_table_long add partition (c=5)"); + "CREATE TABLE all_type_partitioned_table" + + " (id INT, " + + " p_bool BOOLEAN, p_int INT, p_bigint BIGINT, " + + " p_bytes BYTES, p_string STRING, " + + " p_float FLOAT, p_double DOUBLE, " + + " p_date DATE, p_time TIME, p_ts_ntz TIMESTAMP, " + + " p_ts_ltz TIMESTAMP WITH LOCAL TIME ZONE) " + + "PARTITIONED BY (p_bool, p_int, p_bigint, p_bytes, p_string, " + + " p_float, p_double, p_date, p_time, p_ts_ntz, p_ts_ltz) "); + tEnv.executeSql( + "INSERT INTO all_type_partitioned_table VALUES " + + "(1, false, 10, 99999, CAST('Hi' AS VARBINARY), 'hello', 12.5, 7.88, DATE '2025-10-12', TIME '12:55:00', TIMESTAMP '2025-10-12 12:55:00.001', TO_TIMESTAMP_LTZ(4001, 3)), " + + "(2, true, 12, 99998, CAST('Hi' AS VARBINARY), 'world', 13.6, 8.99, DATE '2025-10-11', TIME '12:55:12', TIMESTAMP '2025-10-12 12:55:01.001', TO_TIMESTAMP_LTZ(5001, 3)), " + + "(3, false, 13, 99997, CAST('Hi' AS VARBINARY), 'Hi', 17.44, 4.444, DATE '2025-10-10', TIME '12:55:32', TIMESTAMP '2025-10-12 12:55:02.001', TO_TIMESTAMP_LTZ(6001, 3)), " + + "(4, true, 14, 99996, CAST('Hi' AS VARBINARY), 'Ciao', 11.0, 9.4211, DATE '2025-10-09', TIME '12:00:44', TIMESTAMP '2025-10-12 12:55:03.001', TO_TIMESTAMP_LTZ(7001, 3)); ") + .await(); - List<String> expectedRowValues = - writeRowsToPartition(conn, tablePath, Arrays.asList(4, 5)).stream() - .filter(s -> s.endsWith("4]")) - .map(s -> s.replace("4]", "1970-01-05]")) - .collect(Collectors.toList()); - waitUntilAllBucketFinishSnapshot( - admin, tablePath, Arrays.asList("1970-01-05", "1970-01-06")); + List<String> expectedShowPartitionsResult = + Arrays.asList( + "+I[p_bool=false/p_int=10/p_bigint=99999/p_bytes=4869/p_string=hello/p_float=12_5/p_double=7_88/p_date=2025-10-12/p_time=12-55-00_000/p_ts_ntz=2025-10-12-12-55-00_001/p_ts_ltz=1970-01-01-00-00-04_001]", + "+I[p_bool=true/p_int=12/p_bigint=99998/p_bytes=4869/p_string=world/p_float=13_6/p_double=8_99/p_date=2025-10-11/p_time=12-55-12_000/p_ts_ntz=2025-10-12-12-55-01_001/p_ts_ltz=1970-01-01-00-00-05_001]", + "+I[p_bool=false/p_int=13/p_bigint=99997/p_bytes=4869/p_string=Hi/p_float=17_44/p_double=4_444/p_date=2025-10-10/p_time=12-55-32_000/p_ts_ntz=2025-10-12-12-55-02_001/p_ts_ltz=1970-01-01-00-00-06_001]", + "+I[p_bool=true/p_int=14/p_bigint=99996/p_bytes=4869/p_string=Ciao/p_float=11_0/p_double=9_4211/p_date=2025-10-09/p_time=12-00-44_000/p_ts_ntz=2025-10-12-12-55-03_001/p_ts_ltz=1970-01-01-00-00-07_001]"); + CloseableIterator<Row> showPartitionIterator = + tEnv.executeSql("show partitions all_type_partitioned_table").collect(); + assertResultsIgnoreOrder(showPartitionIterator, expectedShowPartitionsResult, true); - String plan = - tEnv.explainSql("select * from partitioned_table_long where c =DATE'1970-01-05'"); + String query = + "select * from all_type_partitioned_table where " + + "p_bool is false and p_int not in (12) and p_bigint in (99999) and p_bytes=CAST('Hi' AS VARBINARY) " + + "and p_string='hello' and p_float=CAST(12.5 AS FLOAT) and p_double=7.88 and p_date=DATE '2025-10-12' " + + "and p_time=TIME '12:55:00' and p_ts_ntz=TIMESTAMP '2025-10-12 12:55:00.001' " + + "and p_ts_ltz=TO_TIMESTAMP_LTZ(4001, 3)"; + String plan = tEnv.explainSql(query); assertThat(plan) .contains( - "TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_long, filter=[=(c, 1970-01-05)], project=[a, b]]], fields=[a, b])"); + "TableSourceScan(table=[[testcatalog, defaultdb, all_type_partitioned_table, " + + "filter=[and(and(and(and(and(and(and(and(and(and(<>(p_int, 12), " + + "=(p_bigint, 99999:BIGINT)), =(p_bytes, X'4869':VARBINARY(2147483647))), " + + "=(p_string, _UTF-16LE'hello':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")), " + + "=(p_float, 1.25E1:FLOAT)), =(p_double, 7.88E0:DOUBLE)), =(p_date, 2025-10-12)), " + + "=(p_time, 12:55:00)), =(p_ts_ntz, 2025-10-12 12:55:00.001:TIMESTAMP(6))), " + + "=(p_ts_ltz, 1970-01-01 00:00:04.001:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))), NOT(p_bool))], " + + "project=[id, p_bool, p_int]]], fields=[id, p_bool, p_int])") + // all filter conditions should be pushed down + .doesNotContain("where="); - org.apache.flink.util.CloseableIterator<Row> rowIter = - tEnv.executeSql("select * from partitioned_table_long where c =DATE'1970-01-05'") - .collect(); + List<String> expectedRowValues = + Collections.singletonList( + "+I[1, false, 10, 99999, [72, 105], hello, 12.5, 7.88, 2025-10-12, 12:55, 2025-10-12T12:55:00.001, 1970-01-01T00:00:04.001Z]"); + org.apache.flink.util.CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect(); assertResultsIgnoreOrder(rowIter, expectedRowValues, true); } - @Test - void testStreamingReadTimestampPartitionTypePushDown() throws Exception { - // Add partitions with timestamp values (using epoch milliseconds) - long timestamp1Millis = 1609459200000L; // 2021-01-01 00:00:00 UTC - long timestamp2Millis = 1609545600000L; // 2021-01-02 00:00:00 UTC - TimestampLtz timestamp1 = TimestampLtz.fromEpochMillis(timestamp1Millis); - TimestampLtz timestamp2 = TimestampLtz.fromEpochMillis(timestamp2Millis); - - // Test TIMESTAMP partition type - tEnv.executeSql( - "create table partitioned_table_timestamp" - + " (a int not null, b varchar, c timestamp(3), primary key (a, c) NOT ENFORCED) partitioned by (c) "); - TablePath tablePath2 = TablePath.of(DEFAULT_DB, "partitioned_table_timestamp"); - - // Add partitions with timestamp values - TimestampNtz timestampNtz1 = TimestampNtz.fromMillis(timestamp1Millis); - TimestampNtz timestampNtz2 = TimestampNtz.fromMillis(timestamp2Millis); - - tEnv.executeSql( - String.format( - "alter table partitioned_table_timestamp add partition (c=%d)", - timestamp1Millis)); - tEnv.executeSql( - String.format( - "alter table partitioned_table_timestamp add partition (c=%d)", - timestamp2Millis)); - - // Write test data manually for timestamp partitions - List<InternalRow> rows2 = new ArrayList<>(); - List<String> expectedRowValues2 = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - rows2.add(row(i, "v1", timestampNtz1)); - expectedRowValues2.add(String.format("+I[%d, v1, 2021-01-01T00:00]", i)); - } - for (int i = 0; i < 10; i++) { - rows2.add(row(i, "v1", timestampNtz2)); - } - writeRows(conn, tablePath2, rows2, false); - waitUntilAllBucketFinishSnapshot( - admin, tablePath2, Arrays.asList("2021-01-01-00-00-00_", "2021-01-02-00-00-00_")); - - // Test query with TIMESTAMP literal - String plan2 = - tEnv.explainSql( - "select * from partitioned_table_timestamp where c = TIMESTAMP '2021-01-01 00:00:00'"); - assertThat(plan2) - .contains( - "TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_timestamp, filter=[=(c, 2021-01-01 00:00:00:TIMESTAMP(3))], project=[a, b]]], fields=[a, b])"); - - org.apache.flink.util.CloseableIterator<Row> rowIter2 = - tEnv.executeSql( - "select * from partitioned_table_timestamp where c = TIMESTAMP '2021-01-01 00:00:00'") - .collect(); - assertResultsIgnoreOrder(rowIter2, expectedRowValues2, true); - - // Test TIMESTAMP_LTZ partition type - tEnv.executeSql( - "create table partitioned_table_timestamp_ltz" - + " (a int not null, b varchar, c timestamp_ltz(3), primary key (a, c) NOT ENFORCED) partitioned by (c) "); - TablePath tablePath1 = TablePath.of(DEFAULT_DB, "partitioned_table_timestamp_ltz"); - - tEnv.executeSql( - String.format( - "alter table partitioned_table_timestamp_ltz add partition (c=%d)", - timestamp1Millis)); - tEnv.executeSql( - String.format( - "alter table partitioned_table_timestamp_ltz add partition (c=%d)", - timestamp2Millis)); - - // Write test data manually for timestamp_ltz partitions - List<InternalRow> rows1 = new ArrayList<>(); - List<String> expectedRowValues1 = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - rows1.add(row(i, "v1", timestamp1)); - expectedRowValues1.add(String.format("+I[%d, v1, 2021-01-01T00:00:00Z]", i)); - } - for (int i = 0; i < 10; i++) { - rows1.add(row(i, "v1", timestamp2)); - } - writeRows(conn, tablePath1, rows1, false); - waitUntilAllBucketFinishSnapshot( - admin, tablePath1, Arrays.asList("2021-01-01-00-00-00_", "2021-01-02-00-00-00_")); - - // TODO add test for timestamp_ltz - // Test query with TIMESTAMP_LTZ literal - String plan1 = - tEnv.explainSql( - "select * from partitioned_table_timestamp_ltz where c = cast(TIMESTAMP'2021-01-01 08:00:00' as timestamp_ltz(3)) "); - CloseableIterator<Row> collect = - tEnv.executeSql("select cast(TIMESTAMP'2021-01-01 08:00:00' as timestamp_ltz(3))") - .collect(); - org.apache.flink.util.CloseableIterator<Row> rowIter1 = - tEnv.executeSql( - "select * from partitioned_table_timestamp_ltz where c = cast(TIMESTAMP'2021-01-01 08:00:00' as timestamp_ltz(3))") - .collect(); - assertResultsIgnoreOrder(rowIter1, expectedRowValues1, true); - } - - @Test - void testStreamingReadMultiTypePartitionPushDown() throws Exception { - // Create a table with multiple partition fields of different types - tEnv.executeSql( - "create table multi_type_partitioned_table" - + " (id int not null, name varchar, year_val int, month_val varchar, day_val bigint, is_active boolean, " - + "primary key (id, year_val, month_val, day_val, is_active) NOT ENFORCED) " - + "partitioned by (year_val, month_val, day_val, is_active) "); - TablePath tablePath = TablePath.of(DEFAULT_DB, "multi_type_partitioned_table"); - - // Add partitions with different types - tEnv.executeSql( - "alter table multi_type_partitioned_table add partition (year_val=2025, month_val='01', day_val=15, is_active='true')"); - tEnv.executeSql( - "alter table multi_type_partitioned_table add partition (year_val=2025, month_val='02', day_val=20, is_active='false')"); - tEnv.executeSql( - "alter table multi_type_partitioned_table add partition (year_val=2026, month_val='01', day_val=10, is_active='true')"); - - // Write test data to partitions - List<InternalRow> rows = new ArrayList<>(); - List<String> expectedRowValues = new ArrayList<>(); - - // Data for partition (2025, '01', 15, true) - for (int i = 1; i <= 3; i++) { - rows.add(row(i, "name" + i, 2025, "01", 15L, true)); - expectedRowValues.add(String.format("+I[%d, name%d, 2025, 01, 15, true]", i, i)); - } - - // Data for partition (2025, '02', 20, false) - for (int i = 4; i <= 6; i++) { - rows.add(row(i, "name" + i, 2025, "02", 20L, false)); - } - - // Data for partition (2026, '01', 10, true) - for (int i = 7; i <= 9; i++) { - rows.add(row(i, "name" + i, 2026, "01", 10L, true)); - } - - writeRows(conn, tablePath, rows, false); - waitUntilAllBucketFinishSnapshot( - admin, - tablePath, - Arrays.asList("2025$01$15$true", "2025$02$20$false", "2026$01$10$true")); - - // Test 1: Filter by integer partition field - String plan1 = - tEnv.explainSql("select * from multi_type_partitioned_table where year_val = 2025"); - assertThat(plan1) - .contains( - "TableSourceScan(table=[[testcatalog, defaultdb, multi_type_partitioned_table, filter=[=(year_val, 2025)], project=[id, name, month_val, day_val, is_active]]], fields=[id, name, month_val, day_val, is_active])"); - - org.apache.flink.util.CloseableIterator<Row> rowIter1 = - tEnv.executeSql("select * from multi_type_partitioned_table where year_val = 2025") - .collect(); - - List<String> expected1 = new ArrayList<>(); - expected1.addAll(expectedRowValues); // Data from (2025, '01', 15, true) - for (int i = 4; i <= 6; i++) { - expected1.add(String.format("+I[%d, name%d, 2025, 02, 20, false]", i, i)); - } - assertResultsIgnoreOrder(rowIter1, expected1, true); - - // Test 2: Filter by string partition field - String plan2 = - tEnv.explainSql( - "select * from multi_type_partitioned_table where month_val = '01'"); - assertThat(plan2) - .contains( - "TableSourceScan(table=[[testcatalog, defaultdb, multi_type_partitioned_table, filter=[=(month_val, _UTF-16LE'01':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], project=[id, name, year_val, day_val, is_active]]], fields=[id, name, year_val, day_val, is_active])"); - - org.apache.flink.util.CloseableIterator<Row> rowIter2 = - tEnv.executeSql("select * from multi_type_partitioned_table where month_val = '01'") - .collect(); - - List<String> expected2 = new ArrayList<>(); - expected2.addAll(expectedRowValues); // Data from (2025, '01', 15, true) - for (int i = 7; i <= 9; i++) { - expected2.add(String.format("+I[%d, name%d, 2026, 01, 10, true]", i, i)); - } - assertResultsIgnoreOrder(rowIter2, expected2, true); - - // Test 3: Filter by bigint partition field - String plan3 = - tEnv.explainSql("select * from multi_type_partitioned_table where day_val = 15"); - assertThat(plan3) - .contains( - "TableSourceScan(table=[[testcatalog, defaultdb, multi_type_partitioned_table, filter=[=(day_val, 15:BIGINT)], project=[id, name, year_val, month_val, is_active]]], fields=[id, name, year_val, month_val, is_active])"); - - org.apache.flink.util.CloseableIterator<Row> rowIter3 = - tEnv.executeSql("select * from multi_type_partitioned_table where day_val = 15") - .collect(); - assertResultsIgnoreOrder(rowIter3, expectedRowValues, true); - - // Test 4: Filter by boolean partition field - String plan4 = - tEnv.explainSql( - "select * from multi_type_partitioned_table where is_active is true"); - assertThat(plan4) - .contains( - "TableSourceScan(table=[[testcatalog, defaultdb, multi_type_partitioned_table, filter=[is_active]]], fields=[id, name, year_val, month_val, day_val, is_active]"); - - org.apache.flink.util.CloseableIterator<Row> rowIter4 = - tEnv.executeSql("select * from multi_type_partitioned_table where is_active = true") - .collect(); - - List<String> expected4 = new ArrayList<>(); - expected4.addAll(expectedRowValues); // Data from (2025, '01', 15, true) - for (int i = 7; i <= 9; i++) { - expected4.add(String.format("+I[%d, name%d, 2026, 01, 10, true]", i, i)); - } - assertResultsIgnoreOrder(rowIter4, expected4, true); - - // Test 5: Complex filter with multiple partition fields - String plan5 = - tEnv.explainSql( - "select * from multi_type_partitioned_table where year_val = 2025 and month_val = '01' and is_active = true"); - assertThat(plan5) - .contains( - "TableSourceScan(table=[[testcatalog, defaultdb, multi_type_partitioned_table, filter=[and(and(=(year_val, 2025), =(month_val, _UTF-16LE'01':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")), is_active)], project=[id, name, day_val, is_active]]], fields=[id, name, day_val, is_active])"); - - org.apache.flink.util.CloseableIterator<Row> rowIter5 = - tEnv.executeSql( - "select * from multi_type_partitioned_table where year_val = 2025 and month_val = '01' and is_active = true") - .collect(); - assertResultsIgnoreOrder(rowIter5, expectedRowValues, true); - } - @Test void testStreamingReadMultiPartitionPushDown() throws Exception { tEnv.executeSql( @@ -1720,8 +1518,6 @@ abstract class FlinkTableSourceITCase extends AbstractTestBase { for (String partition : partitions) { KvSnapshots snapshots = admin.getLatestKvSnapshots(tablePath, partition).get(); - List<PartitionInfo> partitionInfos = - admin.listPartitionInfos(tablePath).get(); for (int bucketId : snapshots.getBucketIds()) { if (!snapshots.getSnapshotId(bucketId).isPresent()) { return false; diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java index 633443a08..d76fe2369 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java @@ -257,7 +257,7 @@ public class FlinkTestBase extends AbstractTestBase { } public static List<String> writeRowsToPartition( - Connection connection, TablePath tablePath, Collection<Object> partitions) + Connection connection, TablePath tablePath, Collection<String> partitions) throws Exception { List<InternalRow> rows = new ArrayList<>(); List<String> expectedRowValues = new ArrayList<>(); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/PredicateConverterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/PredicateConverterTest.java index 13eaf5b86..76c7f5d68 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/PredicateConverterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/PredicateConverterTest.java @@ -27,6 +27,7 @@ import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.GenericRow; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.FieldReferenceExpression; import org.apache.flink.table.expressions.ResolvedExpression; @@ -52,7 +53,6 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import static org.apache.flink.table.api.DataTypes.STRING; -import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -579,6 +579,18 @@ public class PredicateConverterTest { return new FieldReferenceExpression("f" + i, type, Integer.MAX_VALUE, Integer.MAX_VALUE); } + private static ValueLiteralExpression literal(Object v, DataType type) { + if (v != null) { + return ApiExpressionUtils.valueLiteral(v, type.notNull()); + } else { + return ApiExpressionUtils.valueLiteral(null, type.nullable()); + } + } + + private static ValueLiteralExpression literal(Object v) { + return new ValueLiteralExpression(v); + } + private static CallExpression call(FunctionDefinition function, ResolvedExpression... args) { return new CallExpression(false, null, function, Arrays.asList(args), DataTypes.BOOLEAN()); } diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index b79232fad..c4fa15e05 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -394,6 +394,7 @@ <exclude>org.apache.fluss.flink.tiering.source.TieringSourceOptions</exclude> <exclude>org.apache.fluss.flink.tiering.source.TieringSource.Builder</exclude> <exclude>org.apache.fluss.flink.tiering.source.TieringSource</exclude> + <exclude>org.apache.fluss.flink.tiering.event.TieringRestoreEvent</exclude> <exclude> org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator </exclude> diff --git a/website/docs/engine-flink/reads.md b/website/docs/engine-flink/reads.md index 09c65e2ab..bcb2c2475 100644 --- a/website/docs/engine-flink/reads.md +++ b/website/docs/engine-flink/reads.md @@ -87,9 +87,24 @@ Partition pruning is an optimization technique for Fluss partitioned tables. It This optimization is especially useful in streaming scenarios for [Multi-Field Partitioned Tables](table-design/data-distribution/partitioning.md#multi-field-partitioned-tables) that has many partitions. The partition pruning also supports dynamically pruning new created partitions during streaming read. -:::note -1. Currently, **only equality conditions** (e.g., `c_nationkey = 'US'`) are supported for partition pruning. Operators like `<`, `>`, `OR`, and `IN` are not yet supported. -::: +The supported filter operators for partition pruning on the partition fields are: +- `=` +- `>` +- `<` +- `>=` +- `<=` +- `<>` +- `IN (...)` +- `NOT IN (...)` +- `IS NULL` +- `IS NOT NULL` +- `IS TRUE` +- `IS FALSE` +- `LIKE 'abc%'` for prefix matching +- `LIKE '%abc'` for suffix matching +- `LIKE '%abc%'` for substring matching +- OR conjunctions of filter conditions +- AND conjunctions of filter conditions #### Example diff --git a/website/src/css/custom.css b/website/src/css/custom.css index 6efba04d8..52bebc647 100644 --- a/website/src/css/custom.css +++ b/website/src/css/custom.css @@ -131,7 +131,17 @@ color: #4c576c; } } - + + li { + code { + border-radius: 4px; + background-color: #edf2fa; + border: none; + padding: 3px 4px; + font-size: 14px; + color: #4c576c; + } + } /* pre { code {
