This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit a688465f3394c62928512e23a234e0bcc182e155
Author: Alibaba-HZY <[email protected]>
AuthorDate: Wed Sep 10 17:41:53 2025 +0800

    [flink] Support general predicates for partition pushdown in Flink 
Connector (#420)
---
 LICENSE                                            |   1 +
 .../org/apache/fluss/predicate/LeafPredicate.java  |   5 +
 .../org/apache/fluss/predicate/SimpleColStats.java |  87 +++
 .../java/org/apache/fluss/utils/TypeUtils.java     |   6 +
 .../fluss/predicate/SimpleColStatsTestUtils.java   |  38 ++
 fluss-flink/fluss-flink-common/pom.xml             |  16 +
 .../apache/fluss/flink/row/FlinkAsFlussRow.java    |  11 +
 .../org/apache/fluss/flink/source/FlinkSource.java |  20 +-
 .../fluss/flink/source/FlinkTableSource.java       | 134 +++--
 .../org/apache/fluss/flink/source/FlussSource.java |   4 +-
 .../source/enumerator/FlinkSourceEnumerator.java   | 105 ++--
 .../fluss/flink/utils/PredicateConverter.java      | 328 ++++++++++++
 .../fluss/flink/source/FlinkTableSourceITCase.java | 477 ++++++++++++++++-
 .../enumerator/FlinkSourceEnumeratorTest.java      |  16 +-
 .../apache/fluss/flink/utils/FlinkTestBase.java    |   4 +-
 .../fluss/flink/utils/PredicateConverterTest.java  | 585 +++++++++++++++++++++
 fluss-test-coverage/pom.xml                        |   1 -
 17 files changed, 1728 insertions(+), 110 deletions(-)

diff --git a/LICENSE b/LICENSE
index 481cdf73d..393488043 100644
--- a/LICENSE
+++ b/LICENSE
@@ -361,6 +361,7 @@ Apache Kafka
 
./fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimingWheel.java
 
 Apache Paimon
+./fluss-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java
 ./fluss-common/src/main/java/org/apache/fluss/predicate/And.java
 ./fluss-common/src/main/java/org/apache/fluss/predicate/CompareUtils.java
 ./fluss-common/src/main/java/org/apache/fluss/predicate/CompoundPredicate.java
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java 
b/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java
index b52a713ec..640c5093b 100644
--- a/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java
+++ b/fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java
@@ -19,6 +19,7 @@ package org.apache.fluss.predicate;
 
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.types.DecimalType;
 import org.apache.fluss.types.LocalZonedTimestampType;
 import org.apache.fluss.types.TimestampType;
@@ -84,6 +85,10 @@ public class LeafPredicate implements Predicate {
         return new LeafPredicate(function, type, fieldIndex, fieldName, 
literals);
     }
 
+    public LeafPredicate copyWithNewLiterals(List<Object> literals) {
+        return new LeafPredicate(function, DataTypes.STRING(), fieldIndex, 
fieldName, literals);
+    }
+
     @Override
     public boolean test(InternalRow row) {
         return function.test(type, get(row, fieldIndex, type), literals);
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/predicate/SimpleColStats.java 
b/fluss-common/src/main/java/org/apache/fluss/predicate/SimpleColStats.java
new file mode 100644
index 000000000..435d565dc
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/predicate/SimpleColStats.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.predicate;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/**
+ * A simple column statistics, supports the following stats.
+ *
+ * <ul>
+ *   <li>min: the minimum value of the column
+ *   <li>max: the maximum value of the column
+ *   <li>nullCount: the number of nulls
+ * </ul>
+ */
+public class SimpleColStats {
+
+    public static final SimpleColStats NONE = new SimpleColStats(null, null, 
null);
+
+    @Nullable private final Object min;
+    @Nullable private final Object max;
+    private final Long nullCount;
+
+    public SimpleColStats(@Nullable Object min, @Nullable Object max, 
@Nullable Long nullCount) {
+        this.min = min;
+        this.max = max;
+        this.nullCount = nullCount;
+    }
+
+    @Nullable
+    public Object min() {
+        return min;
+    }
+
+    @Nullable
+    public Object max() {
+        return max;
+    }
+
+    @Nullable
+    public Long nullCount() {
+        return nullCount;
+    }
+
+    public boolean isNone() {
+        return min == null && max == null && nullCount == null;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof SimpleColStats)) {
+            return false;
+        }
+        SimpleColStats that = (SimpleColStats) o;
+        return Objects.equals(min, that.min)
+                && Objects.equals(max, that.max)
+                && Objects.equals(nullCount, that.nullCount);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(min, max, nullCount);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{%s, %s, %d}", min, max, nullCount);
+    }
+}
diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/TypeUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/TypeUtils.java
index 4c344aee1..04283c9d9 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/TypeUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/TypeUtils.java
@@ -21,10 +21,12 @@ import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.Decimal;
 import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.LocalZonedTimestampType;
 import org.apache.fluss.types.TimestampType;
 
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
+import java.util.TimeZone;
 
 /** Type related helper functions. */
 public class TypeUtils {
@@ -62,6 +64,10 @@ public class TypeUtils {
             case TIMESTAMP_WITHOUT_TIME_ZONE:
                 TimestampType timestampType = (TimestampType) type;
                 return BinaryStringUtils.toTimestampNtz(str, 
timestampType.getPrecision());
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                LocalZonedTimestampType localZonedTimestampType = 
(LocalZonedTimestampType) type;
+                return BinaryStringUtils.toTimestampLtz(
+                        str, localZonedTimestampType.getPrecision(), 
TimeZone.getDefault());
             default:
                 throw new UnsupportedOperationException("Unsupported type " + 
type);
         }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/predicate/SimpleColStatsTestUtils.java
 
b/fluss-common/src/test/java/org/apache/fluss/predicate/SimpleColStatsTestUtils.java
new file mode 100644
index 000000000..a0e3ef440
--- /dev/null
+++ 
b/fluss-common/src/test/java/org/apache/fluss/predicate/SimpleColStatsTestUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.predicate;
+
+import org.apache.fluss.row.GenericRow;
+
+/** Utils for testing with {@link SimpleColStats}. */
+public class SimpleColStatsTestUtils {
+
+    public static boolean test(Predicate predicate, long rowCount, 
SimpleColStats[] fieldStats) {
+        Object[] min = new Object[fieldStats.length];
+        Object[] max = new Object[fieldStats.length];
+        Long[] nullCounts = new Long[fieldStats.length];
+        for (int i = 0; i < fieldStats.length; i++) {
+            min[i] = fieldStats[i].min();
+            max[i] = fieldStats[i].max();
+            nullCounts[i] = fieldStats[i].nullCount();
+        }
+
+        return predicate.test(rowCount, GenericRow.of(min), 
GenericRow.of(max), nullCounts);
+    }
+}
diff --git a/fluss-flink/fluss-flink-common/pom.xml 
b/fluss-flink/fluss-flink-common/pom.xml
index c7d59a7f9..aa22e2373 100644
--- a/fluss-flink/fluss-flink-common/pom.xml
+++ b/fluss-flink/fluss-flink-common/pom.xml
@@ -34,6 +34,7 @@
     <properties>
         <flink.major.version>1.20</flink.major.version>
         <flink.minor.version>1.20.1</flink.minor.version>
+        <scala.binary.version>2.12</scala.binary.version>
     </properties>
 
     <dependencies>
@@ -126,6 +127,21 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.minor.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.minor.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.fluss</groupId>
             <artifactId>fluss-server</artifactId>
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java
index 4b48a4ef4..3593d1f10 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.flink.row;
 
+import org.apache.fluss.flink.utils.FlinkConversions;
 import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.Decimal;
 import org.apache.fluss.row.InternalRow;
@@ -24,8 +25,10 @@ import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
 
 import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
 
 /** Wraps a Flink {@link RowData} as a Fluss {@link InternalRow}. */
 public class FlinkAsFlussRow implements InternalRow {
@@ -132,4 +135,12 @@ public class FlinkAsFlussRow implements InternalRow {
     public byte[] getBytes(int pos) {
         return flinkRow.getBinary(pos);
     }
+
+    public static Object fromFlinkObject(Object o, DataType type) {
+        if (o == null) {
+            return null;
+        }
+        return 
InternalRow.createFieldGetter(FlinkConversions.toFlussType(type), 0)
+                .getFieldOrNull((new 
FlinkAsFlussRow()).replace(GenericRowData.of(o)));
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
index 9687b5efc..bc15390fe 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
@@ -30,10 +30,10 @@ import org.apache.fluss.flink.source.split.SourceSplitBase;
 import org.apache.fluss.flink.source.split.SourceSplitSerializer;
 import 
org.apache.fluss.flink.source.state.FlussSourceEnumeratorStateSerializer;
 import org.apache.fluss.flink.source.state.SourceEnumeratorState;
-import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
 import org.apache.fluss.lake.source.LakeSource;
 import org.apache.fluss.lake.source.LakeSplit;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.predicate.Predicate;
 import org.apache.fluss.types.RowType;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -50,10 +50,6 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import javax.annotation.Nullable;
 
-import java.util.List;
-
-import static org.apache.fluss.utils.Preconditions.checkNotNull;
-
 /** Flink source for Fluss. */
 public class FlinkSource<OUT>
         implements Source<OUT, SourceSplitBase, SourceEnumeratorState>, 
ResultTypeQueryable {
@@ -69,10 +65,8 @@ public class FlinkSource<OUT>
     protected final long scanPartitionDiscoveryIntervalMs;
     private final boolean streaming;
     private final FlussDeserializationSchema<OUT> deserializationSchema;
-
-    private final List<FieldEqual> partitionFilters;
-
-    private final @Nullable LakeSource<LakeSplit> lakeSource;
+    @Nullable private final Predicate partitionFilters;
+    @Nullable private final LakeSource<LakeSplit> lakeSource;
 
     public FlinkSource(
             Configuration flussConf,
@@ -85,7 +79,7 @@ public class FlinkSource<OUT>
             long scanPartitionDiscoveryIntervalMs,
             FlussDeserializationSchema<OUT> deserializationSchema,
             boolean streaming,
-            List<FieldEqual> partitionFilters) {
+            @Nullable Predicate partitionFilters) {
         this(
                 flussConf,
                 tablePath,
@@ -112,8 +106,8 @@ public class FlinkSource<OUT>
             long scanPartitionDiscoveryIntervalMs,
             FlussDeserializationSchema<OUT> deserializationSchema,
             boolean streaming,
-            List<FieldEqual> partitionFilters,
-            LakeSource<LakeSplit> lakeSource) {
+            @Nullable Predicate partitionFilters,
+            @Nullable LakeSource<LakeSplit> lakeSource) {
         this.flussConf = flussConf;
         this.tablePath = tablePath;
         this.hasPrimaryKey = hasPrimaryKey;
@@ -124,7 +118,7 @@ public class FlinkSource<OUT>
         this.scanPartitionDiscoveryIntervalMs = 
scanPartitionDiscoveryIntervalMs;
         this.deserializationSchema = deserializationSchema;
         this.streaming = streaming;
-        this.partitionFilters = checkNotNull(partitionFilters);
+        this.partitionFilters = partitionFilters;
         this.lakeSource = lakeSource;
     }
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index 0f6c4c7e5..3c4464724 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -26,6 +26,7 @@ import 
org.apache.fluss.flink.source.lookup.FlinkLookupFunction;
 import org.apache.fluss.flink.source.lookup.LookupNormalizer;
 import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
 import org.apache.fluss.flink.utils.FlinkConversions;
+import org.apache.fluss.flink.utils.PredicateConverter;
 import org.apache.fluss.flink.utils.PushdownUtils;
 import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
 import org.apache.fluss.lake.source.LakeSource;
@@ -34,11 +35,15 @@ import org.apache.fluss.metadata.MergeEngineType;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.predicate.GreaterOrEqual;
 import org.apache.fluss.predicate.LeafPredicate;
+import org.apache.fluss.predicate.PartitionPredicateVisitor;
 import org.apache.fluss.predicate.Predicate;
 import org.apache.fluss.predicate.PredicateBuilder;
+import org.apache.fluss.predicate.PredicateVisitor;
+import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.types.RowType;
+import org.apache.fluss.utils.PartitionUtils;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -85,10 +90,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource;
 import static 
org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
-import static 
org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLUSS_INTERNAL_VALUE;
 import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals;
 import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -140,11 +146,11 @@ public class FlinkTableSource
     @Nullable private RowLevelModificationType modificationScanType;
 
     // count(*) push down
-    protected boolean selectRowCount = false;
+    private boolean selectRowCount = false;
 
     private long limit = -1;
 
-    private List<FieldEqual> partitionFilters = Collections.emptyList();
+    @Nullable private Predicate partitionFilters;
 
     private final Map<String, String> tableOptions;
 
@@ -478,6 +484,7 @@ public class FlinkTableSource
                 && startupOptions.startupMode == 
FlinkConnectorOptions.ScanStartupMode.FULL
                 && hasPrimaryKey()
                 && filters.size() == primaryKeyIndexes.length) {
+
             Map<Integer, LogicalType> primaryKeyTypes = getPrimaryKeyTypes();
             List<FieldEqual> fieldEquals =
                     extractFieldEquals(
@@ -499,52 +506,72 @@ public class FlinkTableSource
             }
             singleRowFilter = lookupRow;
             return Result.of(acceptedFilters, remainingFilters);
-        } else if (isPartitioned()) {
-            // dynamic partition pushdown
-            List<FieldEqual> fieldEquals =
-                    extractFieldEquals(
-                            filters,
-                            getPartitionKeyTypes(),
-                            acceptedFilters,
-                            remainingFilters,
-                            FLUSS_INTERNAL_VALUE);
-
-            // partitions are filtered by string representations, convert the 
equals to string first
-            partitionFilters = stringifyFieldEquals(fieldEquals);
-
+        } else if (isPartitioned()
+                && 
!RowLevelModificationType.UPDATE.equals(modificationScanType)) {
+            // apply partition filter pushdown
+            List<Predicate> converted = new ArrayList<>();
+
+            List<String> fieldNames = tableOutputType.getFieldNames();
+            List<String> partitionKeys =
+                    Arrays.stream(partitionKeyIndexes)
+                            .mapToObj(fieldNames::get)
+                            .collect(Collectors.toList());
+
+            PredicateVisitor<Boolean> partitionPredicateVisitor =
+                    new PartitionPredicateVisitor(partitionKeys);
+
+            LogicalType[] partitionKeyTypes =
+                    Arrays.stream(partitionKeyIndexes)
+                            .mapToObj(producedDataType.getChildren()::get)
+                            .toArray(LogicalType[]::new);
+            for (ResolvedExpression filter : filters) {
+
+                Optional<Predicate> predicateOptional =
+                        PredicateConverter.convert(
+                                
org.apache.flink.table.types.logical.RowType.of(
+                                        partitionKeyTypes, 
partitionKeys.toArray(new String[0])),
+                                filter);
+
+                if (predicateOptional.isPresent()) {
+                    Predicate p = predicateOptional.get();
+                    if (!p.visit(partitionPredicateVisitor)) {
+                        remainingFilters.add(filter);
+                    } else {
+                        acceptedFilters.add(filter);
+                    }
+                    // Convert literals in the predicate to string using
+                    // PartitionUtils.convertValueOfType
+                    p = stringifyPredicate(p);
+                    converted.add(p);
+                } else {
+                    remainingFilters.add(filter);
+                }
+            }
+            partitionFilters = converted.isEmpty() ? null : 
PredicateBuilder.and(converted);
             // lake source is not null
             if (lakeSource != null) {
-                // and exist field equals, push down to lake source
-                if (!fieldEquals.isEmpty()) {
-                    // convert flink row type to fluss row type
-                    RowType flussRowType = 
FlinkConversions.toFlussRowType(tableOutputType);
-
-                    List<Predicate> lakePredicates = new ArrayList<>();
-                    PredicateBuilder predicateBuilder = new 
PredicateBuilder(flussRowType);
-
-                    for (FieldEqual fieldEqual : fieldEquals) {
-                        lakePredicates.add(
-                                predicateBuilder.equal(
-                                        fieldEqual.fieldIndex, 
fieldEqual.equalValue));
-                    }
+                List<Predicate> lakePredicates = new ArrayList<>();
+                for (ResolvedExpression filter : filters) {
+                    Optional<Predicate> predicateOptional =
+                            PredicateConverter.convert(tableOutputType, 
filter);
+                    predicateOptional.ifPresent(lakePredicates::add);
+                }
 
-                    if (!lakePredicates.isEmpty()) {
-                        final LakeSource.FilterPushDownResult 
filterPushDownResult =
-                                lakeSource.withFilters(lakePredicates);
-                        if (filterPushDownResult.acceptedPredicates().size()
-                                != lakePredicates.size()) {
-                            LOG.info(
-                                    "LakeSource rejected some partition 
filters. Falling back to Flink-side filtering.");
-                            // Flink will apply all filters to preserve 
correctness
-                            return Result.of(Collections.emptyList(), filters);
-                        }
+                if (!lakePredicates.isEmpty()) {
+                    final LakeSource.FilterPushDownResult filterPushDownResult 
=
+                            lakeSource.withFilters(lakePredicates);
+                    if (filterPushDownResult.acceptedPredicates().size() != 
lakePredicates.size()) {
+                        LOG.info(
+                                "LakeSource rejected some partition filters. 
Falling back to Flink-side filtering.");
+                        // Flink will apply all filters to preserve correctness
+                        return Result.of(Collections.emptyList(), filters);
                     }
                 }
             }
             return Result.of(acceptedFilters, remainingFilters);
-        } else {
-            return Result.of(Collections.emptyList(), filters);
         }
+
+        return Result.of(Collections.emptyList(), filters);
     }
 
     @Override
@@ -651,4 +678,29 @@ public class FlinkTableSource
     public int[] getPartitionKeyIndexes() {
         return partitionKeyIndexes;
     }
+
+    /**
+     * Converts literals in LeafPredicate to string representation using
+     * PartitionUtils.convertValueOfType. This is necessary because partition 
metadata is stored as
+     * string.
+     */
+    private Predicate stringifyPredicate(Predicate predicate) {
+        if (predicate instanceof LeafPredicate) {
+            // Convert literals to string using 
PartitionUtils.convertValueOfType
+            List<Object> convertedLiterals = new ArrayList<>();
+            for (Object literal : ((LeafPredicate) predicate).literals()) {
+                if (literal != null) {
+                    String stringValue =
+                            PartitionUtils.convertValueOfType(
+                                    literal, ((LeafPredicate) 
predicate).type().getTypeRoot());
+                    
convertedLiterals.add(BinaryString.fromString(stringValue));
+                } else {
+                    convertedLiterals.add(null);
+                }
+            }
+            return ((LeafPredicate) 
predicate).copyWithNewLiterals(convertedLiterals);
+        }
+
+        return predicate;
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
index 428204d83..427741834 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java
@@ -26,8 +26,6 @@ import org.apache.fluss.types.RowType;
 
 import javax.annotation.Nullable;
 
-import java.util.Collections;
-
 /**
  * A Flink DataStream source implementation for reading data from Fluss tables.
  *
@@ -82,7 +80,7 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
                 scanPartitionDiscoveryIntervalMs,
                 deserializationSchema,
                 streaming,
-                Collections.emptyList());
+                null);
     }
 
     /**
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index 867ff442f..de3731224 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -35,14 +35,16 @@ import 
org.apache.fluss.flink.source.split.HybridSnapshotLogSplit;
 import org.apache.fluss.flink.source.split.LogSplit;
 import org.apache.fluss.flink.source.split.SourceSplitBase;
 import org.apache.fluss.flink.source.state.SourceEnumeratorState;
-import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
 import org.apache.fluss.lake.source.LakeSource;
 import org.apache.fluss.lake.source.LakeSplit;
 import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.types.DataField;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.utils.ExceptionUtils;
 
 import org.apache.flink.annotation.VisibleForTesting;
@@ -133,7 +135,7 @@ public class FlinkSourceEnumerator
 
     private volatile boolean closed = false;
 
-    private final List<FieldEqual> partitionFilters;
+    @Nullable private final Predicate partitionFilters;
 
     @Nullable private final LakeSource<LakeSplit> lakeSource;
 
@@ -146,7 +148,7 @@ public class FlinkSourceEnumerator
             OffsetsInitializer startingOffsetsInitializer,
             long scanPartitionDiscoveryIntervalMs,
             boolean streaming,
-            List<FieldEqual> partitionFilters) {
+            @Nullable Predicate partitionFilters) {
         this(
                 tablePath,
                 flussConf,
@@ -169,7 +171,7 @@ public class FlinkSourceEnumerator
             OffsetsInitializer startingOffsetsInitializer,
             long scanPartitionDiscoveryIntervalMs,
             boolean streaming,
-            List<FieldEqual> partitionFilters,
+            @Nullable Predicate partitionFilters,
             @Nullable LakeSource<LakeSplit> lakeSource) {
         this(
                 tablePath,
@@ -199,7 +201,7 @@ public class FlinkSourceEnumerator
             OffsetsInitializer startingOffsetsInitializer,
             long scanPartitionDiscoveryIntervalMs,
             boolean streaming,
-            List<FieldEqual> partitionFilters,
+            @Nullable Predicate partitionFilters,
             @Nullable LakeSource<LakeSplit> lakeSource) {
         this.tablePath = checkNotNull(tablePath);
         this.flussConf = checkNotNull(flussConf);
@@ -216,7 +218,7 @@ public class FlinkSourceEnumerator
                         : new LinkedList<>(pendingHybridLakeFlussSplits);
         this.scanPartitionDiscoveryIntervalMs = 
scanPartitionDiscoveryIntervalMs;
         this.streaming = streaming;
-        this.partitionFilters = checkNotNull(partitionFilters);
+        this.partitionFilters = partitionFilters;
         this.stoppingOffsetsInitializer =
                 streaming ? new NoStoppingOffsetsInitializer() : 
OffsetsInitializer.latest();
         this.lakeSource = lakeSource;
@@ -354,30 +356,43 @@ public class FlinkSourceEnumerator
 
     /** Apply partition filter. */
     private List<PartitionInfo> applyPartitionFilter(List<PartitionInfo> 
partitionInfos) {
-        if (!partitionFilters.isEmpty()) {
-            return partitionInfos.stream()
-                    .filter(
-                            partitionInfo -> {
-                                Map<String, String> specMap =
-                                        
partitionInfo.getPartitionSpec().getSpecMap();
-                                // use getFields() instead of getFieldNames() 
to
-                                // avoid collection construction
-                                List<DataField> fields = 
tableInfo.getRowType().getFields();
-                                for (FieldEqual filter : partitionFilters) {
-                                    String fieldName = 
fields.get(filter.fieldIndex).getName();
-                                    String partitionValue = 
specMap.get(fieldName);
-                                    if (partitionValue == null
-                                            || !filter.equalValue
-                                                    .toString()
-                                                    .equals(partitionValue)) {
-                                        return false;
-                                    }
-                                }
-                                return true;
-                            })
-                    .collect(Collectors.toList());
-        }
-        return partitionInfos;
+        if (partitionFilters == null) {
+            return partitionInfos;
+        } else {
+            int originalSize = partitionInfos.size();
+            List<PartitionInfo> filteredPartitionInfos =
+                    partitionInfos.stream()
+                            .filter(partition -> 
partitionFilters.test(toInternalRow(partition)))
+                            .collect(Collectors.toList());
+
+            int filteredSize = filteredPartitionInfos.size();
+            if (originalSize != filteredSize) {
+                LOG.debug(
+                        "Applied partition filter for table {}: {} partitions 
filtered down to {} "
+                                + "matching partitions with predicate: {}. 
Matching partitions after filtering: {}",
+                        tablePath,
+                        originalSize,
+                        filteredSize,
+                        partitionFilters,
+                        filteredPartitionInfos);
+            } else {
+                LOG.debug(
+                        "Partition filter applied for table {}, but all {} 
partitions matched the predicate",
+                        tablePath,
+                        originalSize);
+            }
+            return filteredPartitionInfos;
+        }
+    }
+
+    private static InternalRow toInternalRow(PartitionInfo partitionInfo) {
+        List<String> partitionValues =
+                partitionInfo.getResolvedPartitionSpec().getPartitionValues();
+        GenericRow genericRow = new GenericRow(partitionValues.size());
+        for (int i = 0; i < partitionValues.size(); i++) {
+            genericRow.setField(i, 
BinaryString.fromString(partitionValues.get(i)));
+        }
+        return genericRow;
     }
 
     /** Init the splits for Fluss. */
@@ -390,17 +405,39 @@ public class FlinkSourceEnumerator
             LOG.error("Failed to list partitions for {}", tablePath, t);
             return;
         }
+
+        LOG.debug(
+                "Checking partition changes for table {}, found {} partitions",
+                tablePath,
+                partitionInfos.size());
+
         final PartitionChange partitionChange = 
getPartitionChange(partitionInfos);
         if (partitionChange.isEmpty()) {
+            LOG.debug("No partition changes detected for table {}", tablePath);
             return;
         }
 
         // handle removed partitions
-        handlePartitionsRemoved(partitionChange.removedPartitions);
+        if (!partitionChange.removedPartitions.isEmpty()) {
+            LOG.info(
+                    "Handling {} removed partitions for table {}: {}",
+                    partitionChange.removedPartitions.size(),
+                    tablePath,
+                    partitionChange.removedPartitions);
+            handlePartitionsRemoved(partitionChange.removedPartitions);
+        }
 
         // handle new partitions
-        context.callAsync(
-                () -> initPartitionedSplits(partitionChange.newPartitions), 
this::handleSplitsAdd);
+        if (!partitionChange.newPartitions.isEmpty()) {
+            LOG.info(
+                    "Handling {} new partitions for table {}: {}",
+                    partitionChange.newPartitions.size(),
+                    tablePath,
+                    partitionChange.newPartitions);
+            context.callAsync(
+                    () -> initPartitionedSplits(partitionChange.newPartitions),
+                    this::handleSplitsAdd);
+        }
     }
 
     private PartitionChange getPartitionChange(Set<PartitionInfo> 
fetchedPartitionInfos) {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java
new file mode 100644
index 000000000..1a7939a14
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.flink.row.FlinkAsFlussRow;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.predicate.PredicateBuilder;
+import org.apache.fluss.predicate.UnsupportedExpression;
+import org.apache.fluss.utils.TypeUtils;
+
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.regex.Pattern;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
+
+/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * Convert Flink {@link Expression} to Fluss {@link Predicate}.
+ *
+ * <p>For {@link FieldReferenceExpression}, please use name instead of index, 
if the project
+ * pushdown is before and the filter pushdown is after, the index of the 
filter will be projected.
+ */
+public class PredicateConverter implements ExpressionVisitor<Predicate> {
+
+    private final PredicateBuilder builder;
+
+    public PredicateConverter(RowType type) {
+        this(new PredicateBuilder(FlinkConversions.toFlussRowType(type)));
+    }
+
+    public PredicateConverter(PredicateBuilder builder) {
+        this.builder = builder;
+    }
+
+    /** Accepts simple LIKE patterns like "abc%". */
+    private static final Pattern BEGIN_PATTERN = Pattern.compile("^[^%_]+%$");
+
+    private static final Pattern END_PATTERN = Pattern.compile("^%[^%_]+$");
+    private static final Pattern CONTAINS_PATTERN = 
Pattern.compile("^%[^%_]+%$");
+
+    @Override
+    public Predicate visit(CallExpression call) {
+        FunctionDefinition func = call.getFunctionDefinition();
+        List<Expression> children = call.getChildren();
+
+        if (func == BuiltInFunctionDefinitions.AND) {
+            return PredicateBuilder.and(children.get(0).accept(this), 
children.get(1).accept(this));
+        } else if (func == BuiltInFunctionDefinitions.OR) {
+            return PredicateBuilder.or(children.get(0).accept(this), 
children.get(1).accept(this));
+        } else if (func == BuiltInFunctionDefinitions.NOT) {
+            return visitNotFunction(children, builder::equal, builder::equal);
+        } else if (func == BuiltInFunctionDefinitions.EQUALS) {
+            return visitBiFunction(children, builder::equal, builder::equal);
+        } else if (func == BuiltInFunctionDefinitions.NOT_EQUALS) {
+            return visitBiFunction(children, builder::notEqual, 
builder::notEqual);
+        } else if (func == BuiltInFunctionDefinitions.GREATER_THAN) {
+            return visitBiFunction(children, builder::greaterThan, 
builder::lessThan);
+        } else if (func == BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL) {
+            return visitBiFunction(children, builder::greaterOrEqual, 
builder::lessOrEqual);
+        } else if (func == BuiltInFunctionDefinitions.LESS_THAN) {
+            return visitBiFunction(children, builder::lessThan, 
builder::greaterThan);
+        } else if (func == BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL) {
+            return visitBiFunction(children, builder::lessOrEqual, 
builder::greaterOrEqual);
+        } else if (func == BuiltInFunctionDefinitions.IN) {
+            FieldReferenceExpression fieldRefExpr =
+                    
extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new);
+            List<Object> literals = new ArrayList<>();
+            for (int i = 1; i < children.size(); i++) {
+                literals.add(extractLiteral(fieldRefExpr.getOutputDataType(), 
children.get(i)));
+            }
+            return builder.in(builder.indexOf(fieldRefExpr.getName()), 
literals);
+        } else if (func == BuiltInFunctionDefinitions.IS_NULL) {
+            return extractFieldReference(children.get(0))
+                    .map(FieldReferenceExpression::getName)
+                    .map(builder::indexOf)
+                    .map(builder::isNull)
+                    .orElseThrow(UnsupportedExpression::new);
+        } else if (func == BuiltInFunctionDefinitions.IS_NOT_NULL) {
+            return extractFieldReference(children.get(0))
+                    .map(FieldReferenceExpression::getName)
+                    .map(builder::indexOf)
+                    .map(builder::isNotNull)
+                    .orElseThrow(UnsupportedExpression::new);
+        } else if (func == BuiltInFunctionDefinitions.BETWEEN) {
+            FieldReferenceExpression fieldRefExpr =
+                    
extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new);
+            return builder.between(
+                    builder.indexOf(fieldRefExpr.getName()), children.get(1), 
children.get(2));
+        } else if (func == BuiltInFunctionDefinitions.LIKE) {
+            FieldReferenceExpression fieldRefExpr =
+                    
extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new);
+            if (fieldRefExpr
+                            .getOutputDataType()
+                            .getLogicalType()
+                            .getTypeRoot()
+                            .getFamilies()
+                            .contains(LogicalTypeFamily.CHARACTER_STRING)
+                    && builder.indexOf(fieldRefExpr.getName()) != -1) {
+                String sqlPattern =
+                        Objects.requireNonNull(
+                                        extractLiteral(
+                                                
fieldRefExpr.getOutputDataType(), children.get(1)))
+                                .toString();
+                String escape =
+                        children.size() <= 2
+                                ? null
+                                : Objects.requireNonNull(
+                                                extractLiteral(
+                                                        
fieldRefExpr.getOutputDataType(),
+                                                        children.get(2)))
+                                        .toString();
+
+                if (escape == null) {
+                    if (BEGIN_PATTERN.matcher(sqlPattern).matches()) {
+                        String prefix = sqlPattern.substring(0, 
sqlPattern.length() - 1);
+                        return 
builder.startsWith(builder.indexOf(fieldRefExpr.getName()), prefix);
+                    }
+                    if (END_PATTERN.matcher(sqlPattern).matches()) {
+                        String suffix = sqlPattern.substring(1);
+                        return 
builder.endsWith(builder.indexOf(fieldRefExpr.getName()), suffix);
+                    }
+                    if (CONTAINS_PATTERN.matcher(sqlPattern).matches()
+                            && sqlPattern.indexOf('%', 1) == 
sqlPattern.length() - 1) {
+                        String mid = sqlPattern.substring(1, 
sqlPattern.length() - 1);
+                        return 
builder.contains(builder.indexOf(fieldRefExpr.getName()), mid);
+                    }
+                }
+            }
+        }
+
+        // TODO is_xxx, between_xxx, similar, in, not_in, not?
+
+        throw new UnsupportedExpression();
+    }
+
+    private Predicate visitBiFunction(
+            List<Expression> children,
+            BiFunction<Integer, Object, Predicate> visit1,
+            BiFunction<Integer, Object, Predicate> visit2) {
+        Optional<FieldReferenceExpression> fieldRefExpr = 
extractFieldReference(children.get(0));
+        if (fieldRefExpr.isPresent() && 
builder.indexOf(fieldRefExpr.get().getName()) != -1) {
+            Object literal =
+                    extractLiteral(fieldRefExpr.get().getOutputDataType(), 
children.get(1));
+            return visit1.apply(builder.indexOf(fieldRefExpr.get().getName()), 
literal);
+        } else {
+            fieldRefExpr = extractFieldReference(children.get(1));
+            if (fieldRefExpr.isPresent()) {
+                Object literal =
+                        extractLiteral(fieldRefExpr.get().getOutputDataType(), 
children.get(0));
+                return 
visit2.apply(builder.indexOf(fieldRefExpr.get().getName()), literal);
+            }
+        }
+
+        throw new UnsupportedExpression();
+    }
+
+    private Predicate visitNotFunction(
+            List<Expression> children,
+            BiFunction<Integer, Object, Predicate> visit1,
+            BiFunction<Integer, Object, Predicate> visit2) {
+        Optional<FieldReferenceExpression> fieldRefExpr = 
extractFieldReference(children.get(0));
+        if (fieldRefExpr.isPresent() && 
builder.indexOf(fieldRefExpr.get().getName()) != -1) {
+
+            return visit1.apply(builder.indexOf(fieldRefExpr.get().getName()), 
false);
+        } else {
+            fieldRefExpr = extractFieldReference(children.get(1));
+            if (fieldRefExpr.isPresent()) {
+                return 
visit2.apply(builder.indexOf(fieldRefExpr.get().getName()), false);
+            }
+        }
+
+        throw new UnsupportedExpression();
+    }
+
+    private Optional<FieldReferenceExpression> 
extractFieldReference(Expression expression) {
+        if (expression instanceof FieldReferenceExpression) {
+            return Optional.of((FieldReferenceExpression) expression);
+        }
+        return Optional.empty();
+    }
+
+    private Object extractLiteral(DataType expectedType, Expression 
expression) {
+        LogicalType expectedLogicalType = expectedType.getLogicalType();
+        if (!supportsPredicate(expectedLogicalType)) {
+            throw new UnsupportedExpression();
+        }
+
+        if (expression instanceof ValueLiteralExpression) {
+            ValueLiteralExpression valueExpression = (ValueLiteralExpression) 
expression;
+            if (valueExpression.isNull()) {
+                return null;
+            }
+
+            DataType actualType = valueExpression.getOutputDataType();
+            LogicalType actualLogicalType = actualType.getLogicalType();
+            Optional<?> valueOpt = 
valueExpression.getValueAs(actualType.getConversionClass());
+            if (valueOpt.isPresent()) {
+                Object value = valueOpt.get();
+                if 
(actualLogicalType.getTypeRoot().equals(expectedLogicalType.getTypeRoot())) {
+                    return FlinkAsFlussRow.fromFlinkObject(
+                            DataStructureConverters.getConverter(expectedType)
+                                    .toInternalOrNull(value),
+                            expectedType);
+                } else if (supportsImplicitCast(actualLogicalType, 
expectedLogicalType)) {
+                    try {
+                        return TypeUtils.castFromString(
+                                value.toString(), 
FlinkConversions.toFlussType(expectedType));
+                    } catch (Exception ignored) {
+                    }
+                }
+            }
+        }
+
+        throw new UnsupportedExpression();
+    }
+
+    private boolean isStringType(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    private boolean supportsPredicate(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+            case BOOLEAN:
+            case BINARY:
+            case VARBINARY:
+            case DECIMAL:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+            case INTERVAL_YEAR_MONTH:
+            case INTERVAL_DAY_TIME:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    @Override
+    public Predicate visit(ValueLiteralExpression valueLiteralExpression) {
+        throw new UnsupportedExpression();
+    }
+
+    @Override
+    public Predicate visit(FieldReferenceExpression fieldReferenceExpression) {
+        if (fieldReferenceExpression.getOutputDataType().getLogicalType() 
instanceof BooleanType) {
+            return 
builder.equal(builder.indexOf(fieldReferenceExpression.getName()), true);
+        }
+        throw new UnsupportedExpression();
+    }
+
+    @Override
+    public Predicate visit(TypeLiteralExpression typeLiteralExpression) {
+        throw new UnsupportedExpression();
+    }
+
+    @Override
+    public Predicate visit(Expression expression) {
+        throw new UnsupportedExpression();
+    }
+
+    /**
+     * Try best to convert a {@link ResolvedExpression} to {@link Predicate}.
+     *
+     * @param filter a resolved expression
+     * @return {@link Predicate} if no {@link UnsupportedExpression} thrown.
+     */
+    public static Optional<Predicate> convert(RowType rowType, 
ResolvedExpression filter) {
+        try {
+            return Optional.ofNullable(filter.accept(new 
PredicateConverter(rowType)));
+        } catch (UnsupportedExpression e) {
+            return Optional.empty();
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
index c279c15b3..03bdc836c 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
@@ -25,9 +25,12 @@ import org.apache.fluss.client.table.Table;
 import org.apache.fluss.client.table.writer.UpsertWriter;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.apache.fluss.utils.clock.ManualClock;
 
@@ -604,7 +607,8 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
         }
 
         List<String> expectedRowValues =
-                writeRowsToPartition(conn, tablePath, 
partitionNameById.values());
+                writeRowsToPartition(
+                        conn, tablePath, 
Collections.singleton(partitionNameById.values()));
         waitUntilAllBucketFinishSnapshot(admin, tablePath, 
partitionNameById.values());
 
         org.apache.flink.util.CloseableIterator<Row> rowIter =
@@ -961,6 +965,262 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
         assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
     }
 
+    @Test
+    void testStreamingReadDatePartitionTypePushDown() throws Exception {
+        tEnv.executeSql(
+                "create table partitioned_table_long"
+                        + " (a int not null, b varchar, c date, primary key 
(a, c) NOT ENFORCED) partitioned by (c) ");
+        TablePath tablePath = TablePath.of(DEFAULT_DB, 
"partitioned_table_long");
+        tEnv.executeSql("alter table partitioned_table_long add partition 
(c=4)");
+        tEnv.executeSql("alter table partitioned_table_long add partition 
(c=5)");
+
+        List<String> expectedRowValues =
+                writeRowsToPartition(conn, tablePath, Arrays.asList(4, 
5)).stream()
+                        .filter(s -> s.endsWith("4]"))
+                        .map(s -> s.replace("4]", "1970-01-05]"))
+                        .collect(Collectors.toList());
+        waitUntilAllBucketFinishSnapshot(
+                admin, tablePath, Arrays.asList("1970-01-05", "1970-01-06"));
+
+        String plan =
+                tEnv.explainSql("select * from partitioned_table_long where c 
=DATE'1970-01-05'");
+        assertThat(plan)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
partitioned_table_long, filter=[=(c, 1970-01-05)], project=[a, b]]], fields=[a, 
b])");
+
+        org.apache.flink.util.CloseableIterator<Row> rowIter =
+                tEnv.executeSql("select * from partitioned_table_long where c 
=DATE'1970-01-05'")
+                        .collect();
+
+        assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
+    }
+
+    @Test
+    void testStreamingReadTimestampPartitionTypePushDown() throws Exception {
+        // Add partitions with timestamp values (using epoch milliseconds)
+        long timestamp1Millis = 1609459200000L; // 2021-01-01 00:00:00 UTC
+        long timestamp2Millis = 1609545600000L; // 2021-01-02 00:00:00 UTC
+        TimestampLtz timestamp1 = 
TimestampLtz.fromEpochMillis(timestamp1Millis);
+        TimestampLtz timestamp2 = 
TimestampLtz.fromEpochMillis(timestamp2Millis);
+
+        // Test TIMESTAMP partition type
+        tEnv.executeSql(
+                "create table partitioned_table_timestamp"
+                        + " (a int not null, b varchar, c timestamp(3), 
primary key (a, c) NOT ENFORCED) partitioned by (c) ");
+        TablePath tablePath2 = TablePath.of(DEFAULT_DB, 
"partitioned_table_timestamp");
+
+        // Add partitions with timestamp values
+        TimestampNtz timestampNtz1 = TimestampNtz.fromMillis(timestamp1Millis);
+        TimestampNtz timestampNtz2 = TimestampNtz.fromMillis(timestamp2Millis);
+
+        tEnv.executeSql(
+                String.format(
+                        "alter table partitioned_table_timestamp add partition 
(c=%d)",
+                        timestamp1Millis));
+        tEnv.executeSql(
+                String.format(
+                        "alter table partitioned_table_timestamp add partition 
(c=%d)",
+                        timestamp2Millis));
+
+        // Write test data manually for timestamp partitions
+        List<InternalRow> rows2 = new ArrayList<>();
+        List<String> expectedRowValues2 = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            rows2.add(row(i, "v1", timestampNtz1));
+            expectedRowValues2.add(String.format("+I[%d, v1, 
2021-01-01T00:00]", i));
+        }
+        for (int i = 0; i < 10; i++) {
+            rows2.add(row(i, "v1", timestampNtz2));
+        }
+        writeRows(conn, tablePath2, rows2, false);
+        waitUntilAllBucketFinishSnapshot(
+                admin, tablePath2, Arrays.asList("2021-01-01-00-00-00_", 
"2021-01-02-00-00-00_"));
+
+        // Test query with TIMESTAMP literal
+        String plan2 =
+                tEnv.explainSql(
+                        "select * from partitioned_table_timestamp where c = 
TIMESTAMP '2021-01-01 00:00:00'");
+        assertThat(plan2)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
partitioned_table_timestamp, filter=[=(c, 2021-01-01 00:00:00:TIMESTAMP(3))], 
project=[a, b]]], fields=[a, b])");
+
+        org.apache.flink.util.CloseableIterator<Row> rowIter2 =
+                tEnv.executeSql(
+                                "select * from partitioned_table_timestamp 
where c = TIMESTAMP '2021-01-01 00:00:00'")
+                        .collect();
+        assertResultsIgnoreOrder(rowIter2, expectedRowValues2, true);
+
+        // Test TIMESTAMP_LTZ partition type
+        tEnv.executeSql(
+                "create table partitioned_table_timestamp_ltz"
+                        + " (a int not null, b varchar, c timestamp_ltz(3), 
primary key (a, c) NOT ENFORCED) partitioned by (c) ");
+        TablePath tablePath1 = TablePath.of(DEFAULT_DB, 
"partitioned_table_timestamp_ltz");
+
+        tEnv.executeSql(
+                String.format(
+                        "alter table partitioned_table_timestamp_ltz add 
partition (c=%d)",
+                        timestamp1Millis));
+        tEnv.executeSql(
+                String.format(
+                        "alter table partitioned_table_timestamp_ltz add 
partition (c=%d)",
+                        timestamp2Millis));
+
+        // Write test data manually for timestamp_ltz partitions
+        List<InternalRow> rows1 = new ArrayList<>();
+        List<String> expectedRowValues1 = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            rows1.add(row(i, "v1", timestamp1));
+            expectedRowValues1.add(String.format("+I[%d, v1, 
2021-01-01T00:00:00Z]", i));
+        }
+        for (int i = 0; i < 10; i++) {
+            rows1.add(row(i, "v1", timestamp2));
+        }
+        writeRows(conn, tablePath1, rows1, false);
+        waitUntilAllBucketFinishSnapshot(
+                admin, tablePath1, Arrays.asList("2021-01-01-00-00-00_", 
"2021-01-02-00-00-00_"));
+
+        // TODO add test for timestamp_ltz
+        // Test query with TIMESTAMP_LTZ literal
+        String plan1 =
+                tEnv.explainSql(
+                        "select * from partitioned_table_timestamp_ltz where c 
= cast(TIMESTAMP'2021-01-01 08:00:00' as timestamp_ltz(3)) ");
+        CloseableIterator<Row> collect =
+                tEnv.executeSql("select cast(TIMESTAMP'2021-01-01 08:00:00' as 
timestamp_ltz(3))")
+                        .collect();
+        org.apache.flink.util.CloseableIterator<Row> rowIter1 =
+                tEnv.executeSql(
+                                "select * from partitioned_table_timestamp_ltz 
where c = cast(TIMESTAMP'2021-01-01 08:00:00' as timestamp_ltz(3))")
+                        .collect();
+        assertResultsIgnoreOrder(rowIter1, expectedRowValues1, true);
+    }
+
+    @Test
+    void testStreamingReadMultiTypePartitionPushDown() throws Exception {
+        // Create a table with multiple partition fields of different types
+        tEnv.executeSql(
+                "create table multi_type_partitioned_table"
+                        + " (id int not null, name varchar, year_val int, 
month_val varchar, day_val bigint, is_active boolean, "
+                        + "primary key (id, year_val, month_val, day_val, 
is_active) NOT ENFORCED) "
+                        + "partitioned by (year_val, month_val, day_val, 
is_active) ");
+        TablePath tablePath = TablePath.of(DEFAULT_DB, 
"multi_type_partitioned_table");
+
+        // Add partitions with different types
+        tEnv.executeSql(
+                "alter table multi_type_partitioned_table add partition 
(year_val=2025, month_val='01', day_val=15, is_active='true')");
+        tEnv.executeSql(
+                "alter table multi_type_partitioned_table add partition 
(year_val=2025, month_val='02', day_val=20, is_active='false')");
+        tEnv.executeSql(
+                "alter table multi_type_partitioned_table add partition 
(year_val=2026, month_val='01', day_val=10, is_active='true')");
+
+        // Write test data to partitions
+        List<InternalRow> rows = new ArrayList<>();
+        List<String> expectedRowValues = new ArrayList<>();
+
+        // Data for partition (2025, '01', 15, true)
+        for (int i = 1; i <= 3; i++) {
+            rows.add(row(i, "name" + i, 2025, "01", 15L, true));
+            expectedRowValues.add(String.format("+I[%d, name%d, 2025, 01, 15, 
true]", i, i));
+        }
+
+        // Data for partition (2025, '02', 20, false)
+        for (int i = 4; i <= 6; i++) {
+            rows.add(row(i, "name" + i, 2025, "02", 20L, false));
+        }
+
+        // Data for partition (2026, '01', 10, true)
+        for (int i = 7; i <= 9; i++) {
+            rows.add(row(i, "name" + i, 2026, "01", 10L, true));
+        }
+
+        writeRows(conn, tablePath, rows, false);
+        waitUntilAllBucketFinishSnapshot(
+                admin,
+                tablePath,
+                Arrays.asList("2025$01$15$true", "2025$02$20$false", 
"2026$01$10$true"));
+
+        // Test 1: Filter by integer partition field
+        String plan1 =
+                tEnv.explainSql("select * from multi_type_partitioned_table 
where year_val = 2025");
+        assertThat(plan1)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
multi_type_partitioned_table, filter=[=(year_val, 2025)], project=[id, name, 
month_val, day_val, is_active]]], fields=[id, name, month_val, day_val, 
is_active])");
+
+        org.apache.flink.util.CloseableIterator<Row> rowIter1 =
+                tEnv.executeSql("select * from multi_type_partitioned_table 
where year_val = 2025")
+                        .collect();
+
+        List<String> expected1 = new ArrayList<>();
+        expected1.addAll(expectedRowValues); // Data from (2025, '01', 15, 
true)
+        for (int i = 4; i <= 6; i++) {
+            expected1.add(String.format("+I[%d, name%d, 2025, 02, 20, false]", 
i, i));
+        }
+        assertResultsIgnoreOrder(rowIter1, expected1, true);
+
+        // Test 2: Filter by string partition field
+        String plan2 =
+                tEnv.explainSql(
+                        "select * from multi_type_partitioned_table where 
month_val = '01'");
+        assertThat(plan2)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
multi_type_partitioned_table, filter=[=(month_val, 
_UTF-16LE'01':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], project=[id, 
name, year_val, day_val, is_active]]], fields=[id, name, year_val, day_val, 
is_active])");
+
+        org.apache.flink.util.CloseableIterator<Row> rowIter2 =
+                tEnv.executeSql("select * from multi_type_partitioned_table 
where month_val = '01'")
+                        .collect();
+
+        List<String> expected2 = new ArrayList<>();
+        expected2.addAll(expectedRowValues); // Data from (2025, '01', 15, 
true)
+        for (int i = 7; i <= 9; i++) {
+            expected2.add(String.format("+I[%d, name%d, 2026, 01, 10, true]", 
i, i));
+        }
+        assertResultsIgnoreOrder(rowIter2, expected2, true);
+
+        // Test 3: Filter by bigint partition field
+        String plan3 =
+                tEnv.explainSql("select * from multi_type_partitioned_table 
where day_val = 15");
+        assertThat(plan3)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
multi_type_partitioned_table, filter=[=(day_val, 15:BIGINT)], project=[id, 
name, year_val, month_val, is_active]]], fields=[id, name, year_val, month_val, 
is_active])");
+
+        org.apache.flink.util.CloseableIterator<Row> rowIter3 =
+                tEnv.executeSql("select * from multi_type_partitioned_table 
where day_val = 15")
+                        .collect();
+        assertResultsIgnoreOrder(rowIter3, expectedRowValues, true);
+
+        // Test 4: Filter by boolean partition field
+        String plan4 =
+                tEnv.explainSql(
+                        "select * from multi_type_partitioned_table where 
is_active is true");
+        assertThat(plan4)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
multi_type_partitioned_table, filter=[is_active]]], fields=[id, name, year_val, 
month_val, day_val, is_active]");
+
+        org.apache.flink.util.CloseableIterator<Row> rowIter4 =
+                tEnv.executeSql("select * from multi_type_partitioned_table 
where is_active = true")
+                        .collect();
+
+        List<String> expected4 = new ArrayList<>();
+        expected4.addAll(expectedRowValues); // Data from (2025, '01', 15, 
true)
+        for (int i = 7; i <= 9; i++) {
+            expected4.add(String.format("+I[%d, name%d, 2026, 01, 10, true]", 
i, i));
+        }
+        assertResultsIgnoreOrder(rowIter4, expected4, true);
+
+        // Test 5: Complex filter with multiple partition fields
+        String plan5 =
+                tEnv.explainSql(
+                        "select * from multi_type_partitioned_table where 
year_val = 2025 and month_val = '01' and is_active = true");
+        assertThat(plan5)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
multi_type_partitioned_table, filter=[and(and(=(year_val, 2025), =(month_val, 
_UTF-16LE'01':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")), is_active)], 
project=[id, name, day_val, is_active]]], fields=[id, name, day_val, 
is_active])");
+
+        org.apache.flink.util.CloseableIterator<Row> rowIter5 =
+                tEnv.executeSql(
+                                "select * from multi_type_partitioned_table 
where year_val = 2025 and month_val = '01' and is_active = true")
+                        .collect();
+        assertResultsIgnoreOrder(rowIter5, expectedRowValues, true);
+    }
+
     @Test
     void testStreamingReadMultiPartitionPushDown() throws Exception {
         tEnv.executeSql(
@@ -979,7 +1239,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                         .filter(s -> s.contains("2025"))
                         .collect(Collectors.toList());
         waitUntilAllBucketFinishSnapshot(
-                admin, tablePath, Arrays.asList("2025$1", "2025$2", "2025$2"));
+                admin, tablePath, Arrays.asList("2025$1", "2025$2", "2026$1"));
 
         String plan = tEnv.explainSql("select * from multi_partitioned_table 
where c ='2025'");
         assertThat(plan)
@@ -1021,7 +1281,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
     }
 
     @Test
-    void testStreamingReadWithCombinedFilters() throws Exception {
+    void testStreamingReadWithCombinedFilters1() throws Exception {
         tEnv.executeSql(
                 "create table combined_filters_table"
                         + " (a int not null, b varchar, c string, d int, 
primary key (a, c) NOT ENFORCED) partitioned by (c) ");
@@ -1030,12 +1290,16 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
         tEnv.executeSql("alter table combined_filters_table add partition 
(c=2026)");
 
         List<InternalRow> rows = new ArrayList<>();
-        List<String> expectedRowValues = new ArrayList<>();
+        List<String> expectedRowValues1 = new ArrayList<>();
+        List<String> expectedRowValues2 = new ArrayList<>();
 
         for (int i = 0; i < 10; i++) {
             rows.add(row(i, "v" + i, "2025", i * 100));
             if (i % 2 == 0) {
-                expectedRowValues.add(String.format("+I[%d, 2025, %d]", i, i * 
100));
+                expectedRowValues1.add(String.format("+I[%d, 2025, %d]", i, i 
* 100));
+            }
+            if (i == 2) {
+                expectedRowValues2.add(String.format("+I[%d, 2025, %d]", i, i 
* 100));
             }
         }
         writeRows(conn, tablePath, rows, false);
@@ -1062,7 +1326,24 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                                 "select a,c,d from combined_filters_table 
where c ='2025' and d % 200 = 0")
                         .collect();
 
-        assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
+        assertResultsIgnoreOrder(rowIter, expectedRowValues1, true);
+
+        plan =
+                tEnv.explainSql(
+                        "select a,c,d from combined_filters_table where c 
='2025' and d = 200");
+        assertThat(plan)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
combined_filters_table, "
+                                + "filter=[=(c, 
_UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], "
+                                + "project=[a, d]]], fields=[a, d])");
+
+        // test column filter、partition filter and flink runtime filter
+        rowIter =
+                tEnv.executeSql(
+                                "select a,c,d from combined_filters_table 
where c ='2025' and d = 200")
+                        .collect();
+
+        assertResultsIgnoreOrder(rowIter, expectedRowValues2, true);
     }
 
     @Test
@@ -1095,8 +1376,9 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
             values[1] = keyValuePairs[1].split("=")[1];
 
             for (int i = 0; i < 10; i++) {
-                rows.add(row(i, "v1", values[0], values[1]));
-                expectedRowValues.add(String.format("+I[%d, v1, %s, %s]", i, 
values[0], values[1]));
+                rows.add(row(i, "v" + i, values[0], values[1]));
+                expectedRowValues.add(
+                        String.format("+I[%d, v%d, %s, %s]", i, i, values[0], 
values[1]));
             }
         }
 
@@ -1105,6 +1387,183 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
         return expectedRowValues;
     }
 
+    @Test
+    void testStreamingReadPartitionPushDownWithInExpr() throws Exception {
+
+        tEnv.executeSql(
+                "create table partitioned_table_in"
+                        + " (a int not null, b varchar, c string, primary key 
(a, c) NOT ENFORCED) partitioned by (c) ");
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "partitioned_table_in");
+        tEnv.executeSql("alter table partitioned_table_in add partition 
(c=2025)");
+        tEnv.executeSql("alter table partitioned_table_in add partition 
(c=2026)");
+        tEnv.executeSql("alter table partitioned_table_in add partition 
(c=2027)");
+
+        List<String> expectedRowValues =
+                writeRowsToPartition(conn, tablePath, Arrays.asList("2025", 
"2026", "2027"))
+                        .stream()
+                        .filter(s -> s.contains("2025") || s.contains("2026"))
+                        .collect(Collectors.toList());
+        waitUntilAllBucketFinishSnapshot(admin, tablePath, 
Arrays.asList("2025", "2026", "2027"));
+
+        String query1 = "select * from partitioned_table_in where c in 
('2025','2026')";
+        String plan = tEnv.explainSql(query1);
+        assertThat(plan)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
partitioned_table_in, filter=[OR(=(c, _UTF-16LE'2025'), =(c, 
_UTF-16LE'2026'))]]], fields=[a, b, c])");
+
+        org.apache.flink.util.CloseableIterator<Row> rowIter = 
tEnv.executeSql(query1).collect();
+        assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
+
+        String query2 = "select * from partitioned_table_in where c ='2025' or 
 c ='2026'";
+        plan = tEnv.explainSql(query2);
+        assertThat(plan)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
partitioned_table_in, filter=[OR(=(c, _UTF-16LE'2025':VARCHAR(2147483647) 
CHARACTER SET \"UTF-16LE\"), =(c, _UTF-16LE'2026':VARCHAR(2147483647) CHARACTER 
SET \"UTF-16LE\"))]]], fields=[a, b, c])");
+        rowIter = tEnv.executeSql(query2).collect();
+        assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
+    }
+
+    @Test
+    void testStreamingReadWithCombinedFiltersAndInExpr() throws Exception {
+        tEnv.executeSql(
+                "create table combined_filters_table_in"
+                        + " (a int not null, b varchar, c string, d int, 
primary key (a, c) NOT ENFORCED) partitioned by (c) ");
+        TablePath tablePath = TablePath.of(DEFAULT_DB, 
"combined_filters_table_in");
+        tEnv.executeSql("alter table combined_filters_table_in add partition 
(c=2025)");
+        tEnv.executeSql("alter table combined_filters_table_in add partition 
(c=2026)");
+        tEnv.executeSql("alter table combined_filters_table_in add partition 
(c=2027)");
+
+        List<InternalRow> rows = new ArrayList<>();
+        List<String> expectedRowValues = new ArrayList<>();
+
+        for (int i = 0; i < 10; i++) {
+            rows.add(row(i, "v" + i, "2025", i * 100));
+            if (i % 2 == 0) {
+                expectedRowValues.add(String.format("+I[%d, 2025, %d]", i, i * 
100));
+            }
+        }
+        for (int i = 0; i < 10; i++) {
+            rows.add(row(i, "v" + i, "2026", i * 100));
+            if (i % 2 == 0) {
+                expectedRowValues.add(String.format("+I[%d, 2026, %d]", i, i * 
100));
+            }
+        }
+
+        for (int i = 0; i < 10; i++) {
+            rows.add(row(i, "v" + i, "2027", i * 100));
+        }
+
+        writeRows(conn, tablePath, rows, false);
+        waitUntilAllBucketFinishSnapshot(admin, tablePath, 
Arrays.asList("2025", "2026", "2027"));
+
+        String query1 =
+                "select a,c,d from combined_filters_table_in where c in 
('2025','2026') and d % 200 = 0";
+        String plan = tEnv.explainSql(query1);
+        assertThat(plan)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
combined_filters_table_in, filter=[OR(=(c, _UTF-16LE'2025'), =(c, 
_UTF-16LE'2026'))], project=[a, c, d]]], fields=[a, c, d])");
+
+        // test column filter、partition filter and flink runtime filter
+        org.apache.flink.util.CloseableIterator<Row> rowIter = 
tEnv.executeSql(query1).collect();
+        assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
+
+        rowIter =
+                tEnv.executeSql(
+                                "select a,c,d from combined_filters_table_in 
where (c ='2025' or  c ='2026') "
+                                        + "and d % 200 = 0")
+                        .collect();
+
+        assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
+    }
+
+    @Test
+    void testStreamingReadPartitionPushDownWithLikeExpr() throws Exception {
+
+        tEnv.executeSql(
+                "create table partitioned_table_like"
+                        + " (a int not null, b varchar, c string, primary key 
(a, c) NOT ENFORCED) partitioned by (c) ");
+        TablePath tablePath = TablePath.of(DEFAULT_DB, 
"partitioned_table_like");
+        tEnv.executeSql("alter table partitioned_table_like add partition 
(c=2025)");
+        tEnv.executeSql("alter table partitioned_table_like add partition 
(c=2026)");
+        tEnv.executeSql("alter table partitioned_table_like add partition 
(c=3026)");
+
+        List<String> allData =
+                writeRowsToPartition(conn, tablePath, Arrays.asList("2025", 
"2026", "3026"));
+        List<String> expectedRowValues =
+                allData.stream()
+                        .filter(s -> s.contains("2025") || s.contains("2026"))
+                        .collect(Collectors.toList());
+        waitUntilAllBucketFinishSnapshot(admin, tablePath, 
Arrays.asList("2025", "2026", "3026"));
+
+        String query1 = "select * from partitioned_table_like where c like 
'202%'";
+        String plan = tEnv.explainSql(query1);
+        assertThat(plan)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
partitioned_table_like, filter=[LIKE(c, _UTF-16LE'202%')]]], fields=[a, b, 
c])");
+
+        org.apache.flink.util.CloseableIterator<Row> rowIter = 
tEnv.executeSql(query1).collect();
+
+        assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
+        expectedRowValues =
+                allData.stream()
+                        .filter(s -> s.contains("2026") || s.contains("3026"))
+                        .collect(Collectors.toList());
+        String query2 = "select * from partitioned_table_like where c like 
'%026'";
+        plan = tEnv.explainSql(query2);
+        assertThat(plan)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
partitioned_table_like, filter=[LIKE(c, _UTF-16LE'%026')]]], fields=[a, b, 
c])");
+        rowIter = tEnv.executeSql(query2).collect();
+        assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
+
+        expectedRowValues =
+                allData.stream().filter(s -> 
s.contains("3026")).collect(Collectors.toList());
+        String query3 = "select * from partitioned_table_like where c like 
'%3026%'";
+        plan = tEnv.explainSql(query3);
+        assertThat(plan)
+                .contains(
+                        "TableSourceScan(table=[[testcatalog, defaultdb, 
partitioned_table_like, filter=[LIKE(c, _UTF-16LE'%3026%')]]], fields=[a, b, 
c])");
+        rowIter = tEnv.executeSql(query3).collect();
+
+        assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
+    }
+
+    @Test
+    void testStreamingReadPartitionComplexPushDown() throws Exception {
+
+        tEnv.executeSql(
+                "create table partitioned_table_complex"
+                        + " (a int not null, b varchar, c string,d string, 
primary key (a, c, d) NOT ENFORCED) partitioned by (c,d) ");
+        TablePath tablePath = TablePath.of(DEFAULT_DB, 
"partitioned_table_complex");
+        tEnv.executeSql("alter table partitioned_table_complex add partition 
(c=2025,d=1)");
+        tEnv.executeSql("alter table partitioned_table_complex add partition 
(c=2025,d=2)");
+        tEnv.executeSql("alter table partitioned_table_complex add partition 
(c=2026,d=1)");
+
+        List<String> allData =
+                writeRowsToTwoPartition(
+                        tablePath, Arrays.asList("c=2025,d=1", "c=2025,d=2", 
"c=2026,d=1"));
+        List<String> expectedRowValues =
+                allData.stream()
+                        .filter(s -> s.contains("v3") && !s.contains("2025, 
2"))
+                        .collect(Collectors.toList());
+        waitUntilAllBucketFinishSnapshot(
+                admin, tablePath, Arrays.asList("2025$1", "2025$2", "2026$1"));
+
+        String query =
+                "select * from partitioned_table_complex where  a = 3\n"
+                        + "    and (c in ('2026')  or d like '%1%') "
+                        + "    and     b like '%v3%'";
+        String plan = tEnv.explainSql(query);
+        assertThat(plan)
+                .contains(
+                        "Calc(select=[3 AS a, b, c, d], where=[((a = 3) AND 
LIKE(b, '%v3%'))])\n"
+                                + "+- TableSourceScan(table=[[testcatalog, 
defaultdb, partitioned_table_complex, filter=[OR(=(c, _UTF-16LE'2026'), LIKE(d, 
_UTF-16LE'%1%'))]]], fields=[a, b, c, d])");
+
+        org.apache.flink.util.CloseableIterator<Row> rowIter = 
tEnv.executeSql(query).collect();
+
+        assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
+    }
+
     private enum Caching {
         ENABLE_CACHE,
         DISABLE_CACHE
@@ -1261,6 +1720,8 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                     for (String partition : partitions) {
                         KvSnapshots snapshots =
                                 admin.getLatestKvSnapshots(tablePath, 
partition).get();
+                        List<PartitionInfo> partitionInfos =
+                                admin.listPartitionInfos(tablePath).get();
                         for (int bucketId : snapshots.getBucketIds()) {
                             if 
(!snapshots.getSnapshotId(bucketId).isPresent()) {
                                 return false;
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
index 7218f3891..5dc4af470 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
@@ -97,7 +97,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             OffsetsInitializer.full(),
                             DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                             streaming,
-                            Collections.emptyList());
+                            null);
 
             enumerator.start();
 
@@ -144,7 +144,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             OffsetsInitializer.full(),
                             DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                             streaming,
-                            Collections.emptyList());
+                            null);
             enumerator.start();
             // register all read
             for (int i = 0; i < numSubtasks; i++) {
@@ -215,7 +215,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             OffsetsInitializer.full(),
                             DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                             streaming,
-                            Collections.emptyList());
+                            null);
 
             enumerator.start();
 
@@ -261,7 +261,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             OffsetsInitializer.full(),
                             DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                             streaming,
-                            Collections.emptyList());
+                            null);
 
             enumerator.start();
 
@@ -297,7 +297,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             OffsetsInitializer.full(),
                             DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                             streaming,
-                            Collections.emptyList());
+                            null);
 
             enumerator.start();
 
@@ -357,7 +357,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             OffsetsInitializer.earliest(),
                             DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                             streaming,
-                            Collections.emptyList(),
+                            null,
                             null);
 
             enumerator.start();
@@ -401,7 +401,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                                 OffsetsInitializer.full(),
                                 DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                                 streaming,
-                                Collections.emptyList())) {
+                                null)) {
             Map<Long, String> partitionNameByIds =
                     waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH);
             enumerator.start();
@@ -516,7 +516,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                                 OffsetsInitializer.full(),
                                 DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                                 streaming,
-                                Collections.emptyList())) {
+                                null)) {
 
             // test splits for same non-partitioned bucket, should assign to 
same task
             TableBucket t1 = new TableBucket(tableId, 0);
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
index 2f0b47a4e..633443a08 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
@@ -257,11 +257,11 @@ public class FlinkTestBase extends AbstractTestBase {
     }
 
     public static List<String> writeRowsToPartition(
-            Connection connection, TablePath tablePath, Collection<String> 
partitions)
+            Connection connection, TablePath tablePath, Collection<Object> 
partitions)
             throws Exception {
         List<InternalRow> rows = new ArrayList<>();
         List<String> expectedRowValues = new ArrayList<>();
-        for (String partition : partitions) {
+        for (Object partition : partitions) {
             for (int i = 0; i < 10; i++) {
                 rows.add(row(i, "v1", partition));
                 expectedRowValues.add(String.format("+I[%d, v1, %s]", i, 
partition));
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/PredicateConverterTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/PredicateConverterTest.java
new file mode 100644
index 000000000..13eaf5b86
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/PredicateConverterTest.java
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.predicate.PredicateBuilder;
+import org.apache.fluss.predicate.SimpleColStats;
+import org.apache.fluss.predicate.SimpleColStatsTestUtils;
+import org.apache.fluss.predicate.UnsupportedExpression;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link PredicateConverter}. */
+public class PredicateConverterTest {
+
+    private static final PredicateBuilder BUILDER =
+            new PredicateBuilder(
+                    FlinkConversions.toFlussRowType(
+                            new RowType(
+                                    Arrays.asList(
+                                            new RowType.RowField("long1", new 
BigIntType()),
+                                            new RowType.RowField("double1", 
new DoubleType()),
+                                            new RowType.RowField(
+                                                    "string1",
+                                                    
DataTypes.STRING().getLogicalType())))));
+
+    private static final PredicateConverter CONVERTER = new 
PredicateConverter(BUILDER);
+
+    @MethodSource("provideResolvedExpression")
+    @ParameterizedTest
+    public void testVisitAndAutoTypeInference(ResolvedExpression expression, 
Predicate expected) {
+        if (expression instanceof CallExpression) {
+            assertThat(CONVERTER.visit((CallExpression) expression).toString())
+                    .isEqualTo(expected.toString());
+        } else {
+            assertThatThrownBy(() -> CONVERTER.visit(expression))
+                    .isInstanceOf(UnsupportedExpression.class);
+        }
+    }
+
+    public static Stream<Arguments> provideResolvedExpression() {
+        FieldReferenceExpression longRefExpr =
+                new FieldReferenceExpression(
+                        "long1", DataTypes.BIGINT(), Integer.MAX_VALUE, 
Integer.MAX_VALUE);
+        ValueLiteralExpression intLitExpr = new ValueLiteralExpression(10);
+        ValueLiteralExpression intLitExpr2 = new ValueLiteralExpression(20);
+        long longLit = 10L;
+        ValueLiteralExpression nullLongLitExpr =
+                new ValueLiteralExpression(null, DataTypes.BIGINT());
+
+        FieldReferenceExpression doubleRefExpr =
+                new FieldReferenceExpression(
+                        "double1", DataTypes.DOUBLE(), Integer.MAX_VALUE, 
Integer.MAX_VALUE);
+        ValueLiteralExpression floatLitExpr = new 
ValueLiteralExpression(3.14f);
+        double doubleLit = 3.14d;
+
+        FieldReferenceExpression stringRefExpr =
+                new FieldReferenceExpression(
+                        "string1", DataTypes.STRING(), Integer.MAX_VALUE, 
Integer.MAX_VALUE);
+        String stringLit = "haha";
+        // same type
+        ValueLiteralExpression stringLitExpr1 =
+                new ValueLiteralExpression(stringLit, 
DataTypes.STRING().notNull());
+        // different type, char(4)
+        ValueLiteralExpression stringLitExpr2 = new 
ValueLiteralExpression(stringLit);
+
+        return Stream.of(
+                Arguments.of(longRefExpr, null),
+                Arguments.of(intLitExpr, null),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.IS_NULL,
+                                Collections.singletonList(longRefExpr),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.isNull(0)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.IS_NOT_NULL,
+                                Collections.singletonList(doubleRefExpr),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.isNotNull(1)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.EQUALS,
+                                // test literal on left
+                                Arrays.asList(intLitExpr, longRefExpr),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.equal(0, longLit)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.EQUALS,
+                                Arrays.asList(nullLongLitExpr, longRefExpr),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.equal(0, null)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.NOT_EQUALS,
+                                Arrays.asList(longRefExpr, intLitExpr),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.notEqual(0, longLit)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.NOT_EQUALS,
+                                Arrays.asList(longRefExpr, nullLongLitExpr),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.notEqual(0, null)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.GREATER_THAN,
+                                Arrays.asList(longRefExpr, intLitExpr),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.greaterThan(0, longLit)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.GREATER_THAN,
+                                Arrays.asList(longRefExpr, nullLongLitExpr),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.greaterThan(0, null)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                
BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
+                                Arrays.asList(longRefExpr, intLitExpr),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.greaterOrEqual(0, longLit)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                
BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL,
+                                Arrays.asList(longRefExpr, nullLongLitExpr),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.greaterOrEqual(0, null)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.LESS_THAN,
+                                Arrays.asList(longRefExpr, intLitExpr),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.lessThan(0, longLit)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.LESS_THAN,
+                                Arrays.asList(longRefExpr, nullLongLitExpr),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.lessThan(0, null)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
+                                Arrays.asList(longRefExpr, intLitExpr),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.lessOrEqual(0, longLit)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
+                                Arrays.asList(longRefExpr, nullLongLitExpr),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.lessOrEqual(0, null)),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.AND,
+                                Arrays.asList(
+                                        CallExpression.permanent(
+                                                
BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL,
+                                                Arrays.asList(longRefExpr, 
intLitExpr),
+                                                DataTypes.BOOLEAN()),
+                                        CallExpression.permanent(
+                                                
BuiltInFunctionDefinitions.EQUALS,
+                                                Arrays.asList(doubleRefExpr, 
floatLitExpr),
+                                                DataTypes.BOOLEAN())),
+                                DataTypes.BOOLEAN()),
+                        PredicateBuilder.and(
+                                BUILDER.lessOrEqual(0, longLit), 
BUILDER.equal(1, doubleLit))),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.OR,
+                                Arrays.asList(
+                                        CallExpression.permanent(
+                                                
BuiltInFunctionDefinitions.NOT_EQUALS,
+                                                Arrays.asList(longRefExpr, 
intLitExpr),
+                                                DataTypes.BOOLEAN()),
+                                        CallExpression.permanent(
+                                                
BuiltInFunctionDefinitions.EQUALS,
+                                                Arrays.asList(doubleRefExpr, 
floatLitExpr),
+                                                DataTypes.BOOLEAN())),
+                                DataTypes.BOOLEAN()),
+                        PredicateBuilder.or(
+                                BUILDER.notEqual(0, longLit), BUILDER.equal(1, 
doubleLit))),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.IN,
+                                Arrays.asList(
+                                        longRefExpr, intLitExpr, 
nullLongLitExpr, intLitExpr2),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.in(0, Arrays.asList(10L, null, 20L))),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.EQUALS,
+                                Arrays.asList(stringLitExpr1, stringRefExpr),
+                                DataTypes.STRING()),
+                        BUILDER.equal(2, BinaryString.fromString("haha"))),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.EQUALS,
+                                Arrays.asList(stringLitExpr2, stringRefExpr),
+                                DataTypes.STRING()),
+                        BUILDER.equal(2, BinaryString.fromString("haha"))),
+                Arguments.of(
+                        CallExpression.permanent(
+                                BuiltInFunctionDefinitions.BETWEEN,
+                                Arrays.asList(longRefExpr, intLitExpr, 
intLitExpr2),
+                                DataTypes.BOOLEAN()),
+                        BUILDER.between(0, 10, 20)));
+    }
+
+    @MethodSource("provideLikeExpressions")
+    @ParameterizedTest
+    public void testStartsWith(
+            CallExpression callExpression,
+            List<Object[]> valuesList,
+            List<Boolean> expectedForValues,
+            List<Long> rowCountList,
+            List<SimpleColStats[]> statsList,
+            List<Boolean> expectedForStats) {
+
+        Predicate predicate =
+                callExpression.accept(
+                        new PredicateConverter(RowType.of(new 
VarCharType(Integer.MAX_VALUE))));
+        IntStream.range(0, valuesList.size())
+                .forEach(
+                        i ->
+                                
assertThat(predicate.test(GenericRow.of(valuesList.get(i))))
+                                        .isEqualTo(expectedForValues.get(i)));
+        IntStream.range(0, rowCountList.size())
+                .forEach(
+                        i ->
+                                assertThat(
+                                                SimpleColStatsTestUtils.test(
+                                                        predicate,
+                                                        rowCountList.get(i),
+                                                        statsList.get(i)))
+                                        .isEqualTo(expectedForStats.get(i)));
+    }
+
+    public static Stream<Arguments> provideLikeExpressions() {
+        CallExpression expr1 =
+                call(
+                        BuiltInFunctionDefinitions.LIKE,
+                        field(0, STRING()),
+                        literal("abd%", STRING()));
+        List<Object[]> valuesList1 =
+                Arrays.asList(
+                        new Object[] {null},
+                        new Object[] {BinaryString.fromString("a")},
+                        new Object[] {BinaryString.fromString("ab")},
+                        new Object[] {BinaryString.fromString("abd")},
+                        new Object[] {BinaryString.fromString("abd%")},
+                        new Object[] {BinaryString.fromString("abd1")},
+                        new Object[] {BinaryString.fromString("abde@")},
+                        new Object[] {BinaryString.fromString("abd_")},
+                        new Object[] {BinaryString.fromString("abd_%")});
+        List<Boolean> expectedForValues1 =
+                Arrays.asList(false, false, false, true, true, true, true, 
true, true);
+        List<Long> rowCountList1 = Arrays.asList(0L, 3L, 3L, 3L);
+        List<SimpleColStats[]> statsList1 =
+                Arrays.asList(
+                        new SimpleColStats[] {new SimpleColStats(null, null, 
0L)},
+                        new SimpleColStats[] {new SimpleColStats(null, null, 
3L)},
+                        new SimpleColStats[] {
+                            new SimpleColStats(
+                                    BinaryString.fromString("ab"),
+                                    BinaryString.fromString("abc123"),
+                                    1L)
+                        },
+                        new SimpleColStats[] {
+                            new SimpleColStats(
+                                    BinaryString.fromString("abc"),
+                                    BinaryString.fromString("abe"),
+                                    1L)
+                        });
+        List<Boolean> expectedForStats1 = Arrays.asList(false, false, false, 
true);
+
+        // currently, SQL wildcards '[]' and '[^]' are deemed as normal 
characters in Flink
+        CallExpression expr3 =
+                call(
+                        BuiltInFunctionDefinitions.LIKE,
+                        field(0, STRING()),
+                        literal("[a-c]xyz%", STRING()));
+        List<Object[]> valuesList3 =
+                Arrays.asList(
+                        new Object[] {BinaryString.fromString("axyz")},
+                        new Object[] {BinaryString.fromString("bxyz")},
+                        new Object[] {BinaryString.fromString("cxyz")},
+                        new Object[] {BinaryString.fromString("[a-c]xyz")});
+        List<Boolean> expectedForValues3 = Arrays.asList(false, false, false, 
true);
+        List<Long> rowCountList3 = Collections.singletonList(3L);
+        List<SimpleColStats[]> statsList3 =
+                Collections.singletonList(
+                        new SimpleColStats[] {
+                            new SimpleColStats(
+                                    BinaryString.fromString("[a-c]xyz"),
+                                    BinaryString.fromString("[a-c]xyzz"),
+                                    0L)
+                        });
+        List<Boolean> expectedForStats3 = Collections.singletonList(true);
+
+        CallExpression expr4 =
+                call(
+                        BuiltInFunctionDefinitions.LIKE,
+                        field(0, STRING()),
+                        literal("[^a-d]xyz%", STRING()));
+        List<Object[]> valuesList4 =
+                Arrays.asList(
+                        new Object[] {BinaryString.fromString("exyz")},
+                        new Object[] {BinaryString.fromString("fxyz")},
+                        new Object[] {BinaryString.fromString("axyz")},
+                        new Object[] {BinaryString.fromString("[^a-d]xyz")});
+        List<Boolean> expectedForValues4 = Arrays.asList(false, false, false, 
true);
+        List<Long> rowCountList4 = Collections.singletonList(3L);
+        List<SimpleColStats[]> statsList4 =
+                Collections.singletonList(
+                        new SimpleColStats[] {
+                            new SimpleColStats(
+                                    BinaryString.fromString("[^a-d]xyz"),
+                                    BinaryString.fromString("[^a-d]xyzz"),
+                                    1L)
+                        });
+        List<Boolean> expectedForStats4 = Collections.singletonList(true);
+
+        return Stream.of(
+                Arguments.of(
+                        expr1,
+                        valuesList1,
+                        expectedForValues1,
+                        rowCountList1,
+                        statsList1,
+                        expectedForStats1),
+                Arguments.of(
+                        expr3,
+                        valuesList3,
+                        expectedForValues3,
+                        rowCountList3,
+                        statsList3,
+                        expectedForStats3),
+                Arguments.of(
+                        expr4,
+                        valuesList4,
+                        expectedForValues4,
+                        rowCountList4,
+                        statsList4,
+                        expectedForStats4));
+    }
+
+    @Test
+    public void testUnsupportedExpression() {
+        CallExpression expression =
+                call(
+                        BuiltInFunctionDefinitions.AND,
+                        call(
+                                BuiltInFunctionDefinitions.EQUALS,
+                                field(0, DataTypes.INT()),
+                                literal(3)),
+                        call(
+                                BuiltInFunctionDefinitions.SIMILAR,
+                                field(1, DataTypes.INT()),
+                                literal(5)));
+        assertThatThrownBy(
+                        () ->
+                                expression.accept(
+                                        new PredicateConverter(
+                                                RowType.of(new IntType(), new 
IntType()))))
+                .isInstanceOf(UnsupportedExpression.class);
+    }
+
+    @Test
+    public void testUnsupportedStartsPatternForLike() {
+        PredicateConverter converter =
+                new PredicateConverter(RowType.of(new 
VarCharType(Integer.MAX_VALUE)));
+        // starts pattern with '_' as wildcard
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                
BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("abc_", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(UnsupportedExpression.class);
+
+        // starts pattern like 'abc%xyz' or 'abc_xyz'
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                
BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("abc%xyz", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(UnsupportedExpression.class);
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                
BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("abc_xyz", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(UnsupportedExpression.class);
+
+        // starts pattern like 'abc%xyz' or 'abc_xyz' with '%' or '_' to escape
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                
BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "=%abc=%%xyz=_",
+                                                        STRING()), // matches 
"%abc%(?s:.*)xyz_"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(UnsupportedExpression.class);
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                
BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "abc=%%xyz",
+                                                        STRING()), // matches 
"abc%(?s:.*)xyz"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(UnsupportedExpression.class);
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                
BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "abc=%_xyz",
+                                                        STRING()), // matches 
"abc%.xyz"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(UnsupportedExpression.class);
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                
BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "abc=_%xyz",
+                                                        STRING()), // matches 
"abc_(?s:.*)xyz"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(UnsupportedExpression.class);
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                
BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal(
+                                                        "abc=__xyz",
+                                                        STRING()), // matches 
"abc_.xyz"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(UnsupportedExpression.class);
+
+        // starts pattern with wildcard '%' at the beginning to escape
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                
BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("=%%", STRING()), // 
matches "%(?s:.*)"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(UnsupportedExpression.class);
+    }
+
+    @Test
+    public void testUnsupportedEqualsPatternForLike() {
+        PredicateConverter converter =
+                new PredicateConverter(RowType.of(new 
VarCharType(Integer.MAX_VALUE)));
+        // equals pattern
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                
BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("123456", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(UnsupportedExpression.class);
+
+        // equals pattern escape '%'
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                
BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("12=%45", STRING()), 
// equals "12%45"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(UnsupportedExpression.class);
+
+        // equals pattern escape '_'
+        assertThatThrownBy(
+                        () ->
+                                call(
+                                                
BuiltInFunctionDefinitions.LIKE,
+                                                field(0, STRING()),
+                                                literal("12=_45", STRING()), 
// equals "12_45"
+                                                literal("=", STRING()))
+                                        .accept(converter))
+                .isInstanceOf(UnsupportedExpression.class);
+    }
+
+    @Test
+    public void testUnsupportedType() {
+        PredicateConverter converter =
+                new PredicateConverter(RowType.of(new 
VarCharType(Integer.MAX_VALUE)));
+        DataType structType = 
DataTypes.ROW(DataTypes.INT()).bridgedTo(Row.class);
+        CallExpression expression =
+                call(
+                        BuiltInFunctionDefinitions.EQUALS,
+                        field(0, structType),
+                        literal(Row.of(1), structType));
+        assertThatThrownBy(() -> expression.accept(converter))
+                .isInstanceOf(UnsupportedExpression.class);
+    }
+
+    @Test
+    public void testUnsupportedFieldReferenceExpression() {
+        PredicateConverter converter =
+                new PredicateConverter(RowType.of(new 
VarCharType(Integer.MAX_VALUE)));
+        DataType structType = 
DataTypes.ROW(DataTypes.INT()).bridgedTo(Row.class);
+        assertThatThrownBy(() -> field(0, structType).accept(converter))
+                .isInstanceOf(UnsupportedExpression.class);
+    }
+
+    private static FieldReferenceExpression field(int i, DataType type) {
+        return new FieldReferenceExpression("f" + i, type, Integer.MAX_VALUE, 
Integer.MAX_VALUE);
+    }
+
+    private static CallExpression call(FunctionDefinition function, 
ResolvedExpression... args) {
+        return new CallExpression(false, null, function, Arrays.asList(args), 
DataTypes.BOOLEAN());
+    }
+}
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index c4fa15e05..b79232fad 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -394,7 +394,6 @@
                                         
<exclude>org.apache.fluss.flink.tiering.source.TieringSourceOptions</exclude>
                                         
<exclude>org.apache.fluss.flink.tiering.source.TieringSource.Builder</exclude>
                                         
<exclude>org.apache.fluss.flink.tiering.source.TieringSource</exclude>
-                                        
<exclude>org.apache.fluss.flink.tiering.event.TieringRestoreEvent</exclude>
                                         <exclude>
                                             
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator
                                         </exclude>

Reply via email to