This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 680ca7e69f17ca3cf6a041ff53a5170266767219
Author: Jark Wu <[email protected]>
AuthorDate: Tue Sep 23 00:16:02 2025 +0800

    [flink] Improve the implementation for general predicates partition 
pushdown and fix bugs
---
 .../java/org/apache/fluss/predicate/Contains.java  |   4 +-
 .../java/org/apache/fluss/predicate/EndsWith.java  |   5 +-
 .../org/apache/fluss/predicate/LeafPredicate.java  |   9 -
 .../fluss/predicate/PartitionPredicateVisitor.java |   5 +-
 .../org/apache/fluss/predicate/StartsWith.java     |   5 +-
 .../org/apache/fluss/predicate/PredicateTest.java  |   6 +-
 .../flink/source/Flink118TableSourceITCase.java    |  11 +-
 fluss-flink/fluss-flink-common/pom.xml             |  16 --
 .../apache/fluss/flink/row/FlinkAsFlussRow.java    |  11 -
 .../fluss/flink/source/FlinkTableSource.java       |  91 ++-----
 .../fluss/flink/utils/PredicateConverter.java      |  82 +++---
 .../flink/utils/StringifyPredicateVisitor.java     |  72 +++++
 .../fluss/flink/source/FlinkTableSourceITCase.java | 300 ++++-----------------
 .../apache/fluss/flink/utils/FlinkTestBase.java    |   2 +-
 .../fluss/flink/utils/PredicateConverterTest.java  |  14 +-
 fluss-test-coverage/pom.xml                        |   1 +
 website/docs/engine-flink/reads.md                 |  21 +-
 website/src/css/custom.css                         |  12 +-
 18 files changed, 244 insertions(+), 423 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/predicate/Contains.java 
b/fluss-common/src/main/java/org/apache/fluss/predicate/Contains.java
index 446a1490c..b50e575dc 100644
--- a/fluss-common/src/main/java/org/apache/fluss/predicate/Contains.java
+++ b/fluss-common/src/main/java/org/apache/fluss/predicate/Contains.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.predicate;
 
+import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.types.DataType;
 
 import java.util.List;
@@ -37,8 +38,7 @@ public class Contains extends NullFalseLeafBinaryFunction {
 
     @Override
     public boolean test(DataType type, Object field, Object patternLiteral) {
-        String fieldString = field.toString();
-        return fieldString.contains((String) patternLiteral);
+        return ((BinaryString) field).contains((BinaryString) patternLiteral);
     }
 
     @Override
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/predicate/EndsWith.java 
b/fluss-common/src/main/java/org/apache/fluss/predicate/EndsWith.java
index 654cecd3f..8f3d423a2 100644
--- a/fluss-common/src/main/java/org/apache/fluss/predicate/EndsWith.java
+++ b/fluss-common/src/main/java/org/apache/fluss/predicate/EndsWith.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.predicate;
 
+import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.types.DataType;
 
 import java.util.List;
@@ -40,8 +41,8 @@ public class EndsWith extends NullFalseLeafBinaryFunction {
 
     @Override
     public boolean test(DataType type, Object field, Object patternLiteral) {
-        String fieldString = field.toString();
-        return fieldString.endsWith((String) patternLiteral);
+        BinaryString fieldString = (BinaryString) field;
+        return fieldString.endsWith((BinaryString) patternLiteral);
     }
 
     @Override
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java 
b/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java
index 640c5093b..17528f74e 100644
--- a/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java
+++ b/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java
@@ -19,7 +19,6 @@ package org.apache.fluss.predicate;
 
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.types.DataType;
-import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.types.DecimalType;
 import org.apache.fluss.types.LocalZonedTimestampType;
 import org.apache.fluss.types.TimestampType;
@@ -81,14 +80,6 @@ public class LeafPredicate implements Predicate {
         return literals;
     }
 
-    public LeafPredicate copyWithNewIndex(int fieldIndex) {
-        return new LeafPredicate(function, type, fieldIndex, fieldName, 
literals);
-    }
-
-    public LeafPredicate copyWithNewLiterals(List<Object> literals) {
-        return new LeafPredicate(function, DataTypes.STRING(), fieldIndex, 
fieldName, literals);
-    }
-
     @Override
     public boolean test(InternalRow row) {
         return function.test(type, get(row, fieldIndex, type), literals);
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/predicate/PartitionPredicateVisitor.java
 
b/fluss-common/src/main/java/org/apache/fluss/predicate/PartitionPredicateVisitor.java
index 284451b74..47957a058 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/predicate/PartitionPredicateVisitor.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/predicate/PartitionPredicateVisitor.java
@@ -23,7 +23,10 @@ import java.util.List;
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
  * additional information regarding copyright ownership. */
 
-/** Visit the predicate and check if it only contains partition key's 
predicate. */
+/**
+ * Visit the predicate and check if it only contains partition key's 
predicate. Returns false if it
+ * contains any predicates that filters on non-partition fields.
+ */
 public class PartitionPredicateVisitor implements PredicateVisitor<Boolean> {
 
     private final List<String> partitionKeys;
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/predicate/StartsWith.java 
b/fluss-common/src/main/java/org/apache/fluss/predicate/StartsWith.java
index ce894c69d..c79ac590c 100644
--- a/fluss-common/src/main/java/org/apache/fluss/predicate/StartsWith.java
+++ b/fluss-common/src/main/java/org/apache/fluss/predicate/StartsWith.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.predicate;
 
+import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.types.DataType;
 
 import java.util.List;
@@ -40,8 +41,8 @@ public class StartsWith extends NullFalseLeafBinaryFunction {
 
     @Override
     public boolean test(DataType type, Object field, Object patternLiteral) {
-        String fieldString = field.toString();
-        return fieldString.startsWith((String) patternLiteral);
+        BinaryString fieldString = (BinaryString) field;
+        return fieldString.startsWith((BinaryString) patternLiteral);
     }
 
     @Override
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/predicate/PredicateTest.java 
b/fluss-common/src/test/java/org/apache/fluss/predicate/PredicateTest.java
index 5899a4425..f6284eab9 100644
--- a/fluss-common/src/test/java/org/apache/fluss/predicate/PredicateTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/predicate/PredicateTest.java
@@ -325,7 +325,7 @@ public class PredicateTest {
     @Test
     public void testEndsWith() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
StringType()));
-        Predicate predicate = builder.endsWith(0, ("bcc"));
+        Predicate predicate = builder.endsWith(0, fromString("bcc"));
         GenericRow row = GenericRow.of(fromString("aabbcc"));
         assertThat(predicate.test(row)).isEqualTo(true);
 
@@ -339,7 +339,7 @@ public class PredicateTest {
     @Test
     public void testStartWith() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
StringType()));
-        Predicate predicate = builder.startsWith(0, ("aab"));
+        Predicate predicate = builder.startsWith(0, fromString("aab"));
         GenericRow row = GenericRow.of(fromString("aabbcc"));
         assertThat(predicate.test(row)).isEqualTo(true);
 
@@ -357,7 +357,7 @@ public class PredicateTest {
     @Test
     public void testContainsWith() {
         PredicateBuilder builder = new PredicateBuilder(RowType.of(new 
StringType()));
-        Predicate predicate = builder.contains(0, ("def"));
+        Predicate predicate = builder.contains(0, fromString("def"));
         GenericRow row1 = GenericRow.of(fromString("aabbdefcc"));
         GenericRow row2 = GenericRow.of(fromString("aabbdcefcc"));
         assertThat(predicate.test(row1)).isEqualTo(true);
diff --git 
a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118TableSourceITCase.java
 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118TableSourceITCase.java
index 195e87b04..0027a6ff8 100644
--- 
a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118TableSourceITCase.java
+++ 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/source/Flink118TableSourceITCase.java
@@ -17,5 +17,14 @@
 
 package org.apache.fluss.flink.source;
 
+import org.junit.jupiter.api.Disabled;
+
 /** IT case for {@link FlinkTableSource} in Flink 1.18. */
-public class Flink118TableSourceITCase extends FlinkTableSourceITCase {}
+class Flink118TableSourceITCase extends FlinkTableSourceITCase {
+
+    @Disabled("Flink 1.18 has a bug in timestamp_ltz type")
+    @Override
+    void testStreamingReadAllPartitionTypePushDown() throws Exception {
+        super.testStreamingReadAllPartitionTypePushDown();
+    }
+}
diff --git a/fluss-flink/fluss-flink-common/pom.xml 
b/fluss-flink/fluss-flink-common/pom.xml
index aa22e2373..c7d59a7f9 100644
--- a/fluss-flink/fluss-flink-common/pom.xml
+++ b/fluss-flink/fluss-flink-common/pom.xml
@@ -34,7 +34,6 @@
     <properties>
         <flink.major.version>1.20</flink.major.version>
         <flink.minor.version>1.20.1</flink.minor.version>
-        <scala.binary.version>2.12</scala.binary.version>
     </properties>
 
     <dependencies>
@@ -127,21 +126,6 @@
             <scope>test</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-            <version>${flink.minor.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-            <version>${flink.minor.version}</version>
-            <scope>test</scope>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.fluss</groupId>
             <artifactId>fluss-server</artifactId>
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java
index 3593d1f10..4b48a4ef4 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java
@@ -17,7 +17,6 @@
 
 package org.apache.fluss.flink.row;
 
-import org.apache.fluss.flink.utils.FlinkConversions;
 import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.Decimal;
 import org.apache.fluss.row.InternalRow;
@@ -25,10 +24,8 @@ import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
 
 import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.types.DataType;
 
 /** Wraps a Flink {@link RowData} as a Fluss {@link InternalRow}. */
 public class FlinkAsFlussRow implements InternalRow {
@@ -135,12 +132,4 @@ public class FlinkAsFlussRow implements InternalRow {
     public byte[] getBytes(int pos) {
         return flinkRow.getBinary(pos);
     }
-
-    public static Object fromFlinkObject(Object o, DataType type) {
-        if (o == null) {
-            return null;
-        }
-        return 
InternalRow.createFieldGetter(FlinkConversions.toFlussType(type), 0)
-                .getFieldOrNull((new 
FlinkAsFlussRow()).replace(GenericRowData.of(o)));
-    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index 3c4464724..c278e1572 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -26,7 +26,6 @@ import 
org.apache.fluss.flink.source.lookup.FlinkLookupFunction;
 import org.apache.fluss.flink.source.lookup.LookupNormalizer;
 import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
 import org.apache.fluss.flink.utils.FlinkConversions;
-import org.apache.fluss.flink.utils.PredicateConverter;
 import org.apache.fluss.flink.utils.PushdownUtils;
 import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
 import org.apache.fluss.lake.source.LakeSource;
@@ -39,11 +38,9 @@ import org.apache.fluss.predicate.PartitionPredicateVisitor;
 import org.apache.fluss.predicate.Predicate;
 import org.apache.fluss.predicate.PredicateBuilder;
 import org.apache.fluss.predicate.PredicateVisitor;
-import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.types.RowType;
-import org.apache.fluss.utils.PartitionUtils;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -91,11 +88,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 import static org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource;
+import static 
org.apache.fluss.flink.utils.PredicateConverter.convertToFlussPredicate;
 import static 
org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
 import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals;
+import static 
org.apache.fluss.flink.utils.StringifyPredicateVisitor.stringifyPartitionPredicate;
 import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 import static org.apache.fluss.utils.Preconditions.checkState;
@@ -506,43 +504,32 @@ public class FlinkTableSource
             }
             singleRowFilter = lookupRow;
             return Result.of(acceptedFilters, remainingFilters);
-        } else if (isPartitioned()
-                && 
!RowLevelModificationType.UPDATE.equals(modificationScanType)) {
+        } else if (isPartitioned()) {
             // apply partition filter pushdown
             List<Predicate> converted = new ArrayList<>();
 
-            List<String> fieldNames = tableOutputType.getFieldNames();
-            List<String> partitionKeys =
-                    Arrays.stream(partitionKeyIndexes)
-                            .mapToObj(fieldNames::get)
-                            .collect(Collectors.toList());
+            RowType partitionRowType =
+                    
FlinkConversions.toFlussRowType(tableOutputType).project(partitionKeyIndexes);
+            PredicateVisitor<Boolean> checksOnlyPartitionKeys =
+                    new 
PartitionPredicateVisitor(partitionRowType.getFieldNames());
 
-            PredicateVisitor<Boolean> partitionPredicateVisitor =
-                    new PartitionPredicateVisitor(partitionKeys);
-
-            LogicalType[] partitionKeyTypes =
-                    Arrays.stream(partitionKeyIndexes)
-                            .mapToObj(producedDataType.getChildren()::get)
-                            .toArray(LogicalType[]::new);
             for (ResolvedExpression filter : filters) {
 
                 Optional<Predicate> predicateOptional =
-                        PredicateConverter.convert(
-                                
org.apache.flink.table.types.logical.RowType.of(
-                                        partitionKeyTypes, 
partitionKeys.toArray(new String[0])),
-                                filter);
+                        convertToFlussPredicate(partitionRowType, filter);
 
                 if (predicateOptional.isPresent()) {
                     Predicate p = predicateOptional.get();
-                    if (!p.visit(partitionPredicateVisitor)) {
+                    // partition pushdown can only guarantee to filter out 
partitions matches the
+                    // predicate, but can't guarantee to filter out all data 
matches to
+                    // non-partition filter in the partition
+                    if (!p.visit(checksOnlyPartitionKeys)) {
                         remainingFilters.add(filter);
                     } else {
                         acceptedFilters.add(filter);
                     }
-                    // Convert literals in the predicate to string using
-                    // PartitionUtils.convertValueOfType
-                    p = stringifyPredicate(p);
-                    converted.add(p);
+                    // Convert literals in the predicate to partition string
+                    converted.add(stringifyPartitionPredicate(p));
                 } else {
                     remainingFilters.add(filter);
                 }
@@ -553,7 +540,7 @@ public class FlinkTableSource
                 List<Predicate> lakePredicates = new ArrayList<>();
                 for (ResolvedExpression filter : filters) {
                     Optional<Predicate> predicateOptional =
-                            PredicateConverter.convert(tableOutputType, 
filter);
+                            convertToFlussPredicate(tableOutputType, filter);
                     predicateOptional.ifPresent(lakePredicates::add);
                 }
 
@@ -631,24 +618,6 @@ public class FlinkTableSource
         return pkTypes;
     }
 
-    private Map<Integer, LogicalType> getPartitionKeyTypes() {
-        Map<Integer, LogicalType> partitionKeyTypes = new HashMap<>();
-        for (int index : partitionKeyIndexes) {
-            partitionKeyTypes.put(index, tableOutputType.getTypeAt(index));
-        }
-        return partitionKeyTypes;
-    }
-
-    private List<FieldEqual> stringifyFieldEquals(List<FieldEqual> 
fieldEquals) {
-        List<FieldEqual> serialize = new ArrayList<>();
-        for (FieldEqual fieldEqual : fieldEquals) {
-            // revisit this again when we support more data types for 
partition key
-            serialize.add(
-                    new FieldEqual(fieldEqual.fieldIndex, 
(fieldEqual.equalValue).toString()));
-        }
-        return serialize;
-    }
-
     // projection from pk_field_index to index_in_pk
     private int[] getKeyRowProjection() {
         int[] projection = new int[tableOutputType.getFieldCount()];
@@ -673,34 +642,4 @@ public class FlinkTableSource
     public int[] getBucketKeyIndexes() {
         return bucketKeyIndexes;
     }
-
-    @VisibleForTesting
-    public int[] getPartitionKeyIndexes() {
-        return partitionKeyIndexes;
-    }
-
-    /**
-     * Converts literals in LeafPredicate to string representation using
-     * PartitionUtils.convertValueOfType. This is necessary because partition 
metadata is stored as
-     * string.
-     */
-    private Predicate stringifyPredicate(Predicate predicate) {
-        if (predicate instanceof LeafPredicate) {
-            // Convert literals to string using 
PartitionUtils.convertValueOfType
-            List<Object> convertedLiterals = new ArrayList<>();
-            for (Object literal : ((LeafPredicate) predicate).literals()) {
-                if (literal != null) {
-                    String stringValue =
-                            PartitionUtils.convertValueOfType(
-                                    literal, ((LeafPredicate) 
predicate).type().getTypeRoot());
-                    
convertedLiterals.add(BinaryString.fromString(stringValue));
-                } else {
-                    convertedLiterals.add(null);
-                }
-            }
-            return ((LeafPredicate) 
predicate).copyWithNewLiterals(convertedLiterals);
-        }
-
-        return predicate;
-    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java
index 1a7939a14..861381c14 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java
@@ -17,12 +17,16 @@
 
 package org.apache.fluss.flink.utils;
 
+import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.flink.row.FlinkAsFlussRow;
 import org.apache.fluss.predicate.Predicate;
 import org.apache.fluss.predicate.PredicateBuilder;
 import org.apache.fluss.predicate.UnsupportedExpression;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.utils.TypeUtils;
 
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.conversion.DataStructureConverters;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
@@ -34,7 +38,6 @@ import 
org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.BooleanType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.RowType;
@@ -62,14 +65,15 @@ public class PredicateConverter implements 
ExpressionVisitor<Predicate> {
 
     private final PredicateBuilder builder;
 
-    public PredicateConverter(RowType type) {
-        this(new PredicateBuilder(FlinkConversions.toFlussRowType(type)));
-    }
-
     public PredicateConverter(PredicateBuilder builder) {
         this.builder = builder;
     }
 
+    @VisibleForTesting
+    PredicateConverter(RowType type) {
+        this(new PredicateBuilder(FlinkConversions.toFlussRowType(type)));
+    }
+
     /** Accepts simple LIKE patterns like "abc%". */
     private static final Pattern BEGIN_PATTERN = Pattern.compile("^[^%_]+%$");
 
@@ -85,8 +89,6 @@ public class PredicateConverter implements 
ExpressionVisitor<Predicate> {
             return PredicateBuilder.and(children.get(0).accept(this), 
children.get(1).accept(this));
         } else if (func == BuiltInFunctionDefinitions.OR) {
             return PredicateBuilder.or(children.get(0).accept(this), 
children.get(1).accept(this));
-        } else if (func == BuiltInFunctionDefinitions.NOT) {
-            return visitNotFunction(children, builder::equal, builder::equal);
         } else if (func == BuiltInFunctionDefinitions.EQUALS) {
             return visitBiFunction(children, builder::equal, builder::equal);
         } else if (func == BuiltInFunctionDefinitions.NOT_EQUALS) {
@@ -119,6 +121,12 @@ public class PredicateConverter implements 
ExpressionVisitor<Predicate> {
                     .map(builder::indexOf)
                     .map(builder::isNotNull)
                     .orElseThrow(UnsupportedExpression::new);
+        } else if (func == BuiltInFunctionDefinitions.NOT) {
+            return extractFieldReference(children.get(0))
+                    .map(FieldReferenceExpression::getName)
+                    .map(builder::indexOf)
+                    .map(idx -> builder.equal(idx, Boolean.FALSE))
+                    .orElseThrow(UnsupportedExpression::new);
         } else if (func == BuiltInFunctionDefinitions.BETWEEN) {
             FieldReferenceExpression fieldRefExpr =
                     
extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new);
@@ -151,22 +159,28 @@ public class PredicateConverter implements 
ExpressionVisitor<Predicate> {
                 if (escape == null) {
                     if (BEGIN_PATTERN.matcher(sqlPattern).matches()) {
                         String prefix = sqlPattern.substring(0, 
sqlPattern.length() - 1);
-                        return 
builder.startsWith(builder.indexOf(fieldRefExpr.getName()), prefix);
+                        return builder.startsWith(
+                                builder.indexOf(fieldRefExpr.getName()),
+                                BinaryString.fromString(prefix));
                     }
                     if (END_PATTERN.matcher(sqlPattern).matches()) {
                         String suffix = sqlPattern.substring(1);
-                        return 
builder.endsWith(builder.indexOf(fieldRefExpr.getName()), suffix);
+                        return builder.endsWith(
+                                builder.indexOf(fieldRefExpr.getName()),
+                                BinaryString.fromString(suffix));
                     }
                     if (CONTAINS_PATTERN.matcher(sqlPattern).matches()
                             && sqlPattern.indexOf('%', 1) == 
sqlPattern.length() - 1) {
                         String mid = sqlPattern.substring(1, 
sqlPattern.length() - 1);
-                        return 
builder.contains(builder.indexOf(fieldRefExpr.getName()), mid);
+                        return builder.contains(
+                                builder.indexOf(fieldRefExpr.getName()),
+                                BinaryString.fromString(mid));
                     }
                 }
             }
         }
 
-        // TODO is_xxx, between_xxx, similar, in, not_in, not?
+        // TODO is_true, is_false, between_xxx, similar, not_in?
 
         throw new UnsupportedExpression();
     }
@@ -192,24 +206,6 @@ public class PredicateConverter implements 
ExpressionVisitor<Predicate> {
         throw new UnsupportedExpression();
     }
 
-    private Predicate visitNotFunction(
-            List<Expression> children,
-            BiFunction<Integer, Object, Predicate> visit1,
-            BiFunction<Integer, Object, Predicate> visit2) {
-        Optional<FieldReferenceExpression> fieldRefExpr = 
extractFieldReference(children.get(0));
-        if (fieldRefExpr.isPresent() && 
builder.indexOf(fieldRefExpr.get().getName()) != -1) {
-
-            return visit1.apply(builder.indexOf(fieldRefExpr.get().getName()), 
false);
-        } else {
-            fieldRefExpr = extractFieldReference(children.get(1));
-            if (fieldRefExpr.isPresent()) {
-                return 
visit2.apply(builder.indexOf(fieldRefExpr.get().getName()), false);
-            }
-        }
-
-        throw new UnsupportedExpression();
-    }
-
     private Optional<FieldReferenceExpression> 
extractFieldReference(Expression expression) {
         if (expression instanceof FieldReferenceExpression) {
             return Optional.of((FieldReferenceExpression) expression);
@@ -235,7 +231,7 @@ public class PredicateConverter implements 
ExpressionVisitor<Predicate> {
             if (valueOpt.isPresent()) {
                 Object value = valueOpt.get();
                 if 
(actualLogicalType.getTypeRoot().equals(expectedLogicalType.getTypeRoot())) {
-                    return FlinkAsFlussRow.fromFlinkObject(
+                    return fromFlinkObject(
                             DataStructureConverters.getConverter(expectedType)
                                     .toInternalOrNull(value),
                             expectedType);
@@ -252,14 +248,12 @@ public class PredicateConverter implements 
ExpressionVisitor<Predicate> {
         throw new UnsupportedExpression();
     }
 
-    private boolean isStringType(LogicalType type) {
-        switch (type.getTypeRoot()) {
-            case CHAR:
-            case VARCHAR:
-                return true;
-            default:
-                return false;
+    private static Object fromFlinkObject(Object o, DataType type) {
+        if (o == null) {
+            return null;
         }
+        return 
InternalRow.createFieldGetter(FlinkConversions.toFlussType(type), 0)
+                .getFieldOrNull((new 
FlinkAsFlussRow()).replace(GenericRowData.of(o)));
     }
 
     private boolean supportsPredicate(LogicalType type) {
@@ -296,9 +290,6 @@ public class PredicateConverter implements 
ExpressionVisitor<Predicate> {
 
     @Override
     public Predicate visit(FieldReferenceExpression fieldReferenceExpression) {
-        if (fieldReferenceExpression.getOutputDataType().getLogicalType() 
instanceof BooleanType) {
-            return 
builder.equal(builder.indexOf(fieldReferenceExpression.getName()), true);
-        }
         throw new UnsupportedExpression();
     }
 
@@ -318,11 +309,18 @@ public class PredicateConverter implements 
ExpressionVisitor<Predicate> {
      * @param filter a resolved expression
      * @return {@link Predicate} if no {@link UnsupportedExpression} thrown.
      */
-    public static Optional<Predicate> convert(RowType rowType, 
ResolvedExpression filter) {
+    public static Optional<Predicate> convertToFlussPredicate(
+            org.apache.fluss.types.RowType rowType, ResolvedExpression filter) 
{
         try {
-            return Optional.ofNullable(filter.accept(new 
PredicateConverter(rowType)));
+            return Optional.ofNullable(
+                    filter.accept(new PredicateConverter(new 
PredicateBuilder(rowType))));
         } catch (UnsupportedExpression e) {
             return Optional.empty();
         }
     }
+
+    public static Optional<Predicate> convertToFlussPredicate(
+            RowType rowType, ResolvedExpression filter) {
+        return 
convertToFlussPredicate(FlinkConversions.toFlussRowType(rowType), filter);
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/StringifyPredicateVisitor.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/StringifyPredicateVisitor.java
new file mode 100644
index 000000000..1d0f39457
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/StringifyPredicateVisitor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.predicate.CompoundPredicate;
+import org.apache.fluss.predicate.LeafPredicate;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.predicate.PredicateVisitor;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.utils.PartitionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link PredicateVisitor} that converts all literals in {@link 
LeafPredicate} to string type in
+ * the string format of {@link PartitionUtils#convertValueOfType(Object, 
DataTypeRoot)}. This is
+ * necessary because partition metadata is stored as string.
+ */
+public class StringifyPredicateVisitor implements PredicateVisitor<Predicate> {
+
+    public static Predicate stringifyPartitionPredicate(Predicate predicate) {
+        StringifyPredicateVisitor visitor = new StringifyPredicateVisitor();
+        return predicate.visit(visitor);
+    }
+
+    @Override
+    public Predicate visit(LeafPredicate predicate) {
+        List<Object> convertedLiterals = new ArrayList<>();
+        for (Object literal : predicate.literals()) {
+            if (literal != null) {
+                String stringValue =
+                        PartitionUtils.convertValueOfType(literal, 
predicate.type().getTypeRoot());
+                convertedLiterals.add(BinaryString.fromString(stringValue));
+            } else {
+                convertedLiterals.add(null);
+            }
+        }
+        return new LeafPredicate(
+                predicate.function(),
+                DataTypes.STRING(),
+                predicate.index(),
+                predicate.fieldName(),
+                convertedLiterals);
+    }
+
+    @Override
+    public Predicate visit(CompoundPredicate predicate) {
+        List<Predicate> newChildren = new ArrayList<>();
+        for (Predicate child : predicate.children()) {
+            newChildren.add(child.visit(this));
+        }
+        return new CompoundPredicate(predicate.function(), newChildren);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
index 03bdc836c..f9577af97 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
@@ -25,12 +25,9 @@ import org.apache.fluss.client.table.Table;
 import org.apache.fluss.client.table.writer.UpsertWriter;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
-import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.TimestampLtz;
-import org.apache.fluss.row.TimestampNtz;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.apache.fluss.utils.clock.ManualClock;
 
@@ -607,8 +604,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
         }
 
         List<String> expectedRowValues =
-                writeRowsToPartition(
-                        conn, tablePath, 
Collections.singleton(partitionNameById.values()));
+                writeRowsToPartition(conn, tablePath, 
partitionNameById.values());
         waitUntilAllBucketFinishSnapshot(admin, tablePath, 
partitionNameById.values());
 
         org.apache.flink.util.CloseableIterator<Row> rowIter =
@@ -966,261 +962,63 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
     }
 
     @Test
-    void testStreamingReadDatePartitionTypePushDown() throws Exception {
+    void testStreamingReadAllPartitionTypePushDown() throws Exception {
         tEnv.executeSql(
-                "create table partitioned_table_long"
-                        + " (a int not null, b varchar, c date, primary key 
(a, c) NOT ENFORCED) partitioned by (c) ");
-        TablePath tablePath = TablePath.of(DEFAULT_DB, 
"partitioned_table_long");
-        tEnv.executeSql("alter table partitioned_table_long add partition 
(c=4)");
-        tEnv.executeSql("alter table partitioned_table_long add partition 
(c=5)");
+                "CREATE TABLE all_type_partitioned_table"
+                        + " (id INT, "
+                        + "  p_bool BOOLEAN, p_int INT, p_bigint BIGINT, "
+                        + "  p_bytes BYTES, p_string STRING, "
+                        + "  p_float FLOAT, p_double DOUBLE, "
+                        + "  p_date DATE, p_time TIME, p_ts_ntz TIMESTAMP, "
+                        + "  p_ts_ltz TIMESTAMP WITH LOCAL TIME ZONE) "
+                        + "PARTITIONED BY (p_bool, p_int, p_bigint, p_bytes, 
p_string, "
+                        + "  p_float, p_double, p_date, p_time, p_ts_ntz, 
p_ts_ltz) ");
+        tEnv.executeSql(
+                        "INSERT INTO all_type_partitioned_table VALUES "
+                                + "(1, false, 10, 99999, CAST('Hi' AS 
VARBINARY), 'hello', 12.5,  7.88,   DATE '2025-10-12', TIME '12:55:00', 
TIMESTAMP '2025-10-12 12:55:00.001', TO_TIMESTAMP_LTZ(4001, 3)), "
+                                + "(2, true,  12, 99998, CAST('Hi' AS 
VARBINARY), 'world', 13.6,  8.99,   DATE '2025-10-11', TIME '12:55:12', 
TIMESTAMP '2025-10-12 12:55:01.001', TO_TIMESTAMP_LTZ(5001, 3)), "
+                                + "(3, false, 13, 99997, CAST('Hi' AS 
VARBINARY), 'Hi',    17.44, 4.444,  DATE '2025-10-10', TIME '12:55:32', 
TIMESTAMP '2025-10-12 12:55:02.001', TO_TIMESTAMP_LTZ(6001, 3)), "
+                                + "(4, true,  14, 99996, CAST('Hi' AS 
VARBINARY), 'Ciao',  11.0,  9.4211, DATE '2025-10-09', TIME '12:00:44', 
TIMESTAMP '2025-10-12 12:55:03.001', TO_TIMESTAMP_LTZ(7001, 3)); ")
+                .await();
 
-        List<String> expectedRowValues =
-                writeRowsToPartition(conn, tablePath, Arrays.asList(4, 
5)).stream()
-                        .filter(s -> s.endsWith("4]"))
-                        .map(s -> s.replace("4]", "1970-01-05]"))
-                        .collect(Collectors.toList());
-        waitUntilAllBucketFinishSnapshot(
-                admin, tablePath, Arrays.asList("1970-01-05", "1970-01-06"));
+        List<String> expectedShowPartitionsResult =
+                Arrays.asList(
+                        
"+I[p_bool=false/p_int=10/p_bigint=99999/p_bytes=4869/p_string=hello/p_float=12_5/p_double=7_88/p_date=2025-10-12/p_time=12-55-00_000/p_ts_ntz=2025-10-12-12-55-00_001/p_ts_ltz=1970-01-01-00-00-04_001]",
+                        
"+I[p_bool=true/p_int=12/p_bigint=99998/p_bytes=4869/p_string=world/p_float=13_6/p_double=8_99/p_date=2025-10-11/p_time=12-55-12_000/p_ts_ntz=2025-10-12-12-55-01_001/p_ts_ltz=1970-01-01-00-00-05_001]",
+                        
"+I[p_bool=false/p_int=13/p_bigint=99997/p_bytes=4869/p_string=Hi/p_float=17_44/p_double=4_444/p_date=2025-10-10/p_time=12-55-32_000/p_ts_ntz=2025-10-12-12-55-02_001/p_ts_ltz=1970-01-01-00-00-06_001]",
+                        
"+I[p_bool=true/p_int=14/p_bigint=99996/p_bytes=4869/p_string=Ciao/p_float=11_0/p_double=9_4211/p_date=2025-10-09/p_time=12-00-44_000/p_ts_ntz=2025-10-12-12-55-03_001/p_ts_ltz=1970-01-01-00-00-07_001]");
+        CloseableIterator<Row> showPartitionIterator =
+                tEnv.executeSql("show partitions 
all_type_partitioned_table").collect();
+        assertResultsIgnoreOrder(showPartitionIterator, 
expectedShowPartitionsResult, true);
 
-        String plan =
-                tEnv.explainSql("select * from partitioned_table_long where c 
=DATE'1970-01-05'");
+        String query =
+                "select * from all_type_partitioned_table where "
+                        + "p_bool is false and p_int not in (12) and p_bigint 
in (99999) and p_bytes=CAST('Hi' AS VARBINARY) "
+                        + "and p_string='hello' and p_float=CAST(12.5 AS 
FLOAT) and p_double=7.88 and p_date=DATE '2025-10-12' "
+                        + "and p_time=TIME '12:55:00' and p_ts_ntz=TIMESTAMP 
'2025-10-12 12:55:00.001' "
+                        + "and p_ts_ltz=TO_TIMESTAMP_LTZ(4001, 3)";
+        String plan = tEnv.explainSql(query);
         assertThat(plan)
                 .contains(
-                        "TableSourceScan(table=[[testcatalog, defaultdb, 
partitioned_table_long, filter=[=(c, 1970-01-05)], project=[a, b]]], fields=[a, 
b])");
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
all_type_partitioned_table, "
+                                + 
"filter=[and(and(and(and(and(and(and(and(and(and(<>(p_int, 12), "
+                                + "=(p_bigint, 99999:BIGINT)), =(p_bytes, 
X'4869':VARBINARY(2147483647))), "
+                                + "=(p_string, 
_UTF-16LE'hello':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")), "
+                                + "=(p_float, 1.25E1:FLOAT)), =(p_double, 
7.88E0:DOUBLE)), =(p_date, 2025-10-12)), "
+                                + "=(p_time, 12:55:00)), =(p_ts_ntz, 
2025-10-12 12:55:00.001:TIMESTAMP(6))), "
+                                + "=(p_ts_ltz, 1970-01-01 
00:00:04.001:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))), NOT(p_bool))], "
+                                + "project=[id, p_bool, p_int]]], fields=[id, 
p_bool, p_int])")
+                // all filter conditions should be pushed down
+                .doesNotContain("where=");
 
-        org.apache.flink.util.CloseableIterator<Row> rowIter =
-                tEnv.executeSql("select * from partitioned_table_long where c 
=DATE'1970-01-05'")
-                        .collect();
+        List<String> expectedRowValues =
+                Collections.singletonList(
+                        "+I[1, false, 10, 99999, [72, 105], hello, 12.5, 7.88, 
2025-10-12, 12:55, 2025-10-12T12:55:00.001, 1970-01-01T00:00:04.001Z]");
+        org.apache.flink.util.CloseableIterator<Row> rowIter = 
tEnv.executeSql(query).collect();
 
         assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
     }
 
-    @Test
-    void testStreamingReadTimestampPartitionTypePushDown() throws Exception {
-        // Add partitions with timestamp values (using epoch milliseconds)
-        long timestamp1Millis = 1609459200000L; // 2021-01-01 00:00:00 UTC
-        long timestamp2Millis = 1609545600000L; // 2021-01-02 00:00:00 UTC
-        TimestampLtz timestamp1 = 
TimestampLtz.fromEpochMillis(timestamp1Millis);
-        TimestampLtz timestamp2 = 
TimestampLtz.fromEpochMillis(timestamp2Millis);
-
-        // Test TIMESTAMP partition type
-        tEnv.executeSql(
-                "create table partitioned_table_timestamp"
-                        + " (a int not null, b varchar, c timestamp(3), 
primary key (a, c) NOT ENFORCED) partitioned by (c) ");
-        TablePath tablePath2 = TablePath.of(DEFAULT_DB, 
"partitioned_table_timestamp");
-
-        // Add partitions with timestamp values
-        TimestampNtz timestampNtz1 = TimestampNtz.fromMillis(timestamp1Millis);
-        TimestampNtz timestampNtz2 = TimestampNtz.fromMillis(timestamp2Millis);
-
-        tEnv.executeSql(
-                String.format(
-                        "alter table partitioned_table_timestamp add partition 
(c=%d)",
-                        timestamp1Millis));
-        tEnv.executeSql(
-                String.format(
-                        "alter table partitioned_table_timestamp add partition 
(c=%d)",
-                        timestamp2Millis));
-
-        // Write test data manually for timestamp partitions
-        List<InternalRow> rows2 = new ArrayList<>();
-        List<String> expectedRowValues2 = new ArrayList<>();
-        for (int i = 0; i < 10; i++) {
-            rows2.add(row(i, "v1", timestampNtz1));
-            expectedRowValues2.add(String.format("+I[%d, v1, 
2021-01-01T00:00]", i));
-        }
-        for (int i = 0; i < 10; i++) {
-            rows2.add(row(i, "v1", timestampNtz2));
-        }
-        writeRows(conn, tablePath2, rows2, false);
-        waitUntilAllBucketFinishSnapshot(
-                admin, tablePath2, Arrays.asList("2021-01-01-00-00-00_", 
"2021-01-02-00-00-00_"));
-
-        // Test query with TIMESTAMP literal
-        String plan2 =
-                tEnv.explainSql(
-                        "select * from partitioned_table_timestamp where c = 
TIMESTAMP '2021-01-01 00:00:00'");
-        assertThat(plan2)
-                .contains(
-                        "TableSourceScan(table=[[testcatalog, defaultdb, 
partitioned_table_timestamp, filter=[=(c, 2021-01-01 00:00:00:TIMESTAMP(3))], 
project=[a, b]]], fields=[a, b])");
-
-        org.apache.flink.util.CloseableIterator<Row> rowIter2 =
-                tEnv.executeSql(
-                                "select * from partitioned_table_timestamp 
where c = TIMESTAMP '2021-01-01 00:00:00'")
-                        .collect();
-        assertResultsIgnoreOrder(rowIter2, expectedRowValues2, true);
-
-        // Test TIMESTAMP_LTZ partition type
-        tEnv.executeSql(
-                "create table partitioned_table_timestamp_ltz"
-                        + " (a int not null, b varchar, c timestamp_ltz(3), 
primary key (a, c) NOT ENFORCED) partitioned by (c) ");
-        TablePath tablePath1 = TablePath.of(DEFAULT_DB, 
"partitioned_table_timestamp_ltz");
-
-        tEnv.executeSql(
-                String.format(
-                        "alter table partitioned_table_timestamp_ltz add 
partition (c=%d)",
-                        timestamp1Millis));
-        tEnv.executeSql(
-                String.format(
-                        "alter table partitioned_table_timestamp_ltz add 
partition (c=%d)",
-                        timestamp2Millis));
-
-        // Write test data manually for timestamp_ltz partitions
-        List<InternalRow> rows1 = new ArrayList<>();
-        List<String> expectedRowValues1 = new ArrayList<>();
-        for (int i = 0; i < 10; i++) {
-            rows1.add(row(i, "v1", timestamp1));
-            expectedRowValues1.add(String.format("+I[%d, v1, 
2021-01-01T00:00:00Z]", i));
-        }
-        for (int i = 0; i < 10; i++) {
-            rows1.add(row(i, "v1", timestamp2));
-        }
-        writeRows(conn, tablePath1, rows1, false);
-        waitUntilAllBucketFinishSnapshot(
-                admin, tablePath1, Arrays.asList("2021-01-01-00-00-00_", 
"2021-01-02-00-00-00_"));
-
-        // TODO add test for timestamp_ltz
-        // Test query with TIMESTAMP_LTZ literal
-        String plan1 =
-                tEnv.explainSql(
-                        "select * from partitioned_table_timestamp_ltz where c 
= cast(TIMESTAMP'2021-01-01 08:00:00' as timestamp_ltz(3)) ");
-        CloseableIterator<Row> collect =
-                tEnv.executeSql("select cast(TIMESTAMP'2021-01-01 08:00:00' as 
timestamp_ltz(3))")
-                        .collect();
-        org.apache.flink.util.CloseableIterator<Row> rowIter1 =
-                tEnv.executeSql(
-                                "select * from partitioned_table_timestamp_ltz 
where c = cast(TIMESTAMP'2021-01-01 08:00:00' as timestamp_ltz(3))")
-                        .collect();
-        assertResultsIgnoreOrder(rowIter1, expectedRowValues1, true);
-    }
-
-    @Test
-    void testStreamingReadMultiTypePartitionPushDown() throws Exception {
-        // Create a table with multiple partition fields of different types
-        tEnv.executeSql(
-                "create table multi_type_partitioned_table"
-                        + " (id int not null, name varchar, year_val int, 
month_val varchar, day_val bigint, is_active boolean, "
-                        + "primary key (id, year_val, month_val, day_val, 
is_active) NOT ENFORCED) "
-                        + "partitioned by (year_val, month_val, day_val, 
is_active) ");
-        TablePath tablePath = TablePath.of(DEFAULT_DB, 
"multi_type_partitioned_table");
-
-        // Add partitions with different types
-        tEnv.executeSql(
-                "alter table multi_type_partitioned_table add partition 
(year_val=2025, month_val='01', day_val=15, is_active='true')");
-        tEnv.executeSql(
-                "alter table multi_type_partitioned_table add partition 
(year_val=2025, month_val='02', day_val=20, is_active='false')");
-        tEnv.executeSql(
-                "alter table multi_type_partitioned_table add partition 
(year_val=2026, month_val='01', day_val=10, is_active='true')");
-
-        // Write test data to partitions
-        List<InternalRow> rows = new ArrayList<>();
-        List<String> expectedRowValues = new ArrayList<>();
-
-        // Data for partition (2025, '01', 15, true)
-        for (int i = 1; i <= 3; i++) {
-            rows.add(row(i, "name" + i, 2025, "01", 15L, true));
-            expectedRowValues.add(String.format("+I[%d, name%d, 2025, 01, 15, 
true]", i, i));
-        }
-
-        // Data for partition (2025, '02', 20, false)
-        for (int i = 4; i <= 6; i++) {
-            rows.add(row(i, "name" + i, 2025, "02", 20L, false));
-        }
-
-        // Data for partition (2026, '01', 10, true)
-        for (int i = 7; i <= 9; i++) {
-            rows.add(row(i, "name" + i, 2026, "01", 10L, true));
-        }
-
-        writeRows(conn, tablePath, rows, false);
-        waitUntilAllBucketFinishSnapshot(
-                admin,
-                tablePath,
-                Arrays.asList("2025$01$15$true", "2025$02$20$false", 
"2026$01$10$true"));
-
-        // Test 1: Filter by integer partition field
-        String plan1 =
-                tEnv.explainSql("select * from multi_type_partitioned_table 
where year_val = 2025");
-        assertThat(plan1)
-                .contains(
-                        "TableSourceScan(table=[[testcatalog, defaultdb, 
multi_type_partitioned_table, filter=[=(year_val, 2025)], project=[id, name, 
month_val, day_val, is_active]]], fields=[id, name, month_val, day_val, 
is_active])");
-
-        org.apache.flink.util.CloseableIterator<Row> rowIter1 =
-                tEnv.executeSql("select * from multi_type_partitioned_table 
where year_val = 2025")
-                        .collect();
-
-        List<String> expected1 = new ArrayList<>();
-        expected1.addAll(expectedRowValues); // Data from (2025, '01', 15, 
true)
-        for (int i = 4; i <= 6; i++) {
-            expected1.add(String.format("+I[%d, name%d, 2025, 02, 20, false]", 
i, i));
-        }
-        assertResultsIgnoreOrder(rowIter1, expected1, true);
-
-        // Test 2: Filter by string partition field
-        String plan2 =
-                tEnv.explainSql(
-                        "select * from multi_type_partitioned_table where 
month_val = '01'");
-        assertThat(plan2)
-                .contains(
-                        "TableSourceScan(table=[[testcatalog, defaultdb, 
multi_type_partitioned_table, filter=[=(month_val, 
_UTF-16LE'01':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], project=[id, 
name, year_val, day_val, is_active]]], fields=[id, name, year_val, day_val, 
is_active])");
-
-        org.apache.flink.util.CloseableIterator<Row> rowIter2 =
-                tEnv.executeSql("select * from multi_type_partitioned_table 
where month_val = '01'")
-                        .collect();
-
-        List<String> expected2 = new ArrayList<>();
-        expected2.addAll(expectedRowValues); // Data from (2025, '01', 15, 
true)
-        for (int i = 7; i <= 9; i++) {
-            expected2.add(String.format("+I[%d, name%d, 2026, 01, 10, true]", 
i, i));
-        }
-        assertResultsIgnoreOrder(rowIter2, expected2, true);
-
-        // Test 3: Filter by bigint partition field
-        String plan3 =
-                tEnv.explainSql("select * from multi_type_partitioned_table 
where day_val = 15");
-        assertThat(plan3)
-                .contains(
-                        "TableSourceScan(table=[[testcatalog, defaultdb, 
multi_type_partitioned_table, filter=[=(day_val, 15:BIGINT)], project=[id, 
name, year_val, month_val, is_active]]], fields=[id, name, year_val, month_val, 
is_active])");
-
-        org.apache.flink.util.CloseableIterator<Row> rowIter3 =
-                tEnv.executeSql("select * from multi_type_partitioned_table 
where day_val = 15")
-                        .collect();
-        assertResultsIgnoreOrder(rowIter3, expectedRowValues, true);
-
-        // Test 4: Filter by boolean partition field
-        String plan4 =
-                tEnv.explainSql(
-                        "select * from multi_type_partitioned_table where 
is_active is true");
-        assertThat(plan4)
-                .contains(
-                        "TableSourceScan(table=[[testcatalog, defaultdb, 
multi_type_partitioned_table, filter=[is_active]]], fields=[id, name, year_val, 
month_val, day_val, is_active]");
-
-        org.apache.flink.util.CloseableIterator<Row> rowIter4 =
-                tEnv.executeSql("select * from multi_type_partitioned_table 
where is_active = true")
-                        .collect();
-
-        List<String> expected4 = new ArrayList<>();
-        expected4.addAll(expectedRowValues); // Data from (2025, '01', 15, 
true)
-        for (int i = 7; i <= 9; i++) {
-            expected4.add(String.format("+I[%d, name%d, 2026, 01, 10, true]", 
i, i));
-        }
-        assertResultsIgnoreOrder(rowIter4, expected4, true);
-
-        // Test 5: Complex filter with multiple partition fields
-        String plan5 =
-                tEnv.explainSql(
-                        "select * from multi_type_partitioned_table where 
year_val = 2025 and month_val = '01' and is_active = true");
-        assertThat(plan5)
-                .contains(
-                        "TableSourceScan(table=[[testcatalog, defaultdb, 
multi_type_partitioned_table, filter=[and(and(=(year_val, 2025), =(month_val, 
_UTF-16LE'01':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")), is_active)], 
project=[id, name, day_val, is_active]]], fields=[id, name, day_val, 
is_active])");
-
-        org.apache.flink.util.CloseableIterator<Row> rowIter5 =
-                tEnv.executeSql(
-                                "select * from multi_type_partitioned_table 
where year_val = 2025 and month_val = '01' and is_active = true")
-                        .collect();
-        assertResultsIgnoreOrder(rowIter5, expectedRowValues, true);
-    }
-
     @Test
     void testStreamingReadMultiPartitionPushDown() throws Exception {
         tEnv.executeSql(
@@ -1720,8 +1518,6 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                     for (String partition : partitions) {
                         KvSnapshots snapshots =
                                 admin.getLatestKvSnapshots(tablePath, 
partition).get();
-                        List<PartitionInfo> partitionInfos =
-                                admin.listPartitionInfos(tablePath).get();
                         for (int bucketId : snapshots.getBucketIds()) {
                             if 
(!snapshots.getSnapshotId(bucketId).isPresent()) {
                                 return false;
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
index 633443a08..d76fe2369 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
@@ -257,7 +257,7 @@ public class FlinkTestBase extends AbstractTestBase {
     }
 
     public static List<String> writeRowsToPartition(
-            Connection connection, TablePath tablePath, Collection<Object> 
partitions)
+            Connection connection, TablePath tablePath, Collection<String> 
partitions)
             throws Exception {
         List<InternalRow> rows = new ArrayList<>();
         List<String> expectedRowValues = new ArrayList<>();
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/PredicateConverterTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/PredicateConverterTest.java
index 13eaf5b86..76c7f5d68 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/PredicateConverterTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/PredicateConverterTest.java
@@ -27,6 +27,7 @@ import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.GenericRow;
 
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
@@ -52,7 +53,6 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static org.apache.flink.table.api.DataTypes.STRING;
-import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -579,6 +579,18 @@ public class PredicateConverterTest {
         return new FieldReferenceExpression("f" + i, type, Integer.MAX_VALUE, 
Integer.MAX_VALUE);
     }
 
+    private static ValueLiteralExpression literal(Object v, DataType type) {
+        if (v != null) {
+            return ApiExpressionUtils.valueLiteral(v, type.notNull());
+        } else {
+            return ApiExpressionUtils.valueLiteral(null, type.nullable());
+        }
+    }
+
+    private static ValueLiteralExpression literal(Object v) {
+        return new ValueLiteralExpression(v);
+    }
+
     private static CallExpression call(FunctionDefinition function, 
ResolvedExpression... args) {
         return new CallExpression(false, null, function, Arrays.asList(args), 
DataTypes.BOOLEAN());
     }
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index b79232fad..c4fa15e05 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -394,6 +394,7 @@
                                         
<exclude>org.apache.fluss.flink.tiering.source.TieringSourceOptions</exclude>
                                         
<exclude>org.apache.fluss.flink.tiering.source.TieringSource.Builder</exclude>
                                         
<exclude>org.apache.fluss.flink.tiering.source.TieringSource</exclude>
+                                        
<exclude>org.apache.fluss.flink.tiering.event.TieringRestoreEvent</exclude>
                                         <exclude>
                                             
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator
                                         </exclude>
diff --git a/website/docs/engine-flink/reads.md 
b/website/docs/engine-flink/reads.md
index 09c65e2ab..bcb2c2475 100644
--- a/website/docs/engine-flink/reads.md
+++ b/website/docs/engine-flink/reads.md
@@ -87,9 +87,24 @@ Partition pruning is an optimization technique for Fluss 
partitioned tables. It
 This optimization is especially useful in streaming scenarios for [Multi-Field 
Partitioned 
Tables](table-design/data-distribution/partitioning.md#multi-field-partitioned-tables)
 that has many partitions.
 The partition pruning also supports dynamically pruning new created partitions 
during streaming read.
 
-:::note
-1. Currently, **only equality conditions** (e.g., `c_nationkey = 'US'`) are 
supported for partition pruning. Operators like `<`, `>`, `OR`, and `IN` are 
not yet supported.
-:::
+The supported filter operators for partition pruning on the partition fields 
are:
+- `=`
+- `>`
+- `<`
+- `>=`
+- `<=`
+- `<>`
+- `IN (...)`
+- `NOT IN (...)`
+- `IS NULL`
+- `IS NOT NULL`
+- `IS TRUE`
+- `IS FALSE`
+- `LIKE 'abc%'` for prefix matching
+- `LIKE '%abc'` for suffix matching
+- `LIKE '%abc%'` for substring matching
+- OR conjunctions of filter conditions
+- AND conjunctions of filter conditions
 
 #### Example
 
diff --git a/website/src/css/custom.css b/website/src/css/custom.css
index 6efba04d8..52bebc647 100644
--- a/website/src/css/custom.css
+++ b/website/src/css/custom.css
@@ -131,7 +131,17 @@
           color: #4c576c;
       }
     }
-    
+
+    li {
+        code {
+            border-radius: 4px;
+            background-color: #edf2fa;
+            border: none;
+            padding: 3px 4px;
+            font-size: 14px;
+            color: #4c576c;
+        }
+    }
 
 /*    pre {
         code {

Reply via email to