This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f0fbf527 [lake/iceberg] Iceberg read implementation supports filter 
push down (#1715)
9f0fbf527 is described below

commit 9f0fbf527801e75897dfc2b55f7b9c545fdc3f62
Author: Junbo Wang <[email protected]>
AuthorDate: Fri Sep 19 10:19:59 2025 +0800

    [lake/iceberg] Iceberg read implementation supports filter push down (#1715)
---
 .../iceberg/source/FlussRowAsIcebergRecord.java    | 182 ++++++++++++++++++
 .../lake/iceberg/source/IcebergLakeSource.java     |  35 +++-
 .../lake/iceberg/source/IcebergRecordReader.java   |   8 +-
 .../lake/iceberg/source/IcebergSplitPlanner.java   |  20 +-
 .../tiering/FlussRecordAsIcebergRecord.java        | 164 +++-------------
 .../fluss/lake/iceberg/tiering/RecordWriter.java   |   2 +-
 .../utils/FlussToIcebergPredicateConverter.java    | 206 +++++++++++++++++++++
 .../lake/iceberg/utils/IcebergConversions.java     |  48 +++++
 .../flink/FlinkUnionReadLogTableITCase.java        |   5 +-
 .../iceberg/source/IcebergLakeSourceTest.java}     | 129 +++++--------
 .../FlussToIcebergPredicateConverterTest.java      | 171 +++++++++++++++++
 .../lake/iceberg/utils/IcebergConversionsTest.java |   1 -
 .../fluss/lake/paimon/source/PaimonLakeSource.java |   3 +-
 .../lake/paimon/source/PaimonLakeSourceTest.java   |   2 +-
 14 files changed, 739 insertions(+), 237 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
new file mode 100644
index 000000000..805bddcb6
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.BigIntType;
+import org.apache.fluss.types.BinaryType;
+import org.apache.fluss.types.BooleanType;
+import org.apache.fluss.types.BytesType;
+import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DateType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.DoubleType;
+import org.apache.fluss.types.FloatType;
+import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.types.SmallIntType;
+import org.apache.fluss.types.StringType;
+import org.apache.fluss.types.TimeType;
+import org.apache.fluss.types.TimestampType;
+import org.apache.fluss.types.TinyIntType;
+import org.apache.fluss.utils.DateTimeUtils;
+
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Map;
+
+/** Wrap Fluss {@link InternalRow} as Iceberg {@link Record}. */
+public class FlussRowAsIcebergRecord implements Record {
+
+    protected InternalRow internalRow;
+    protected final Types.StructType structType;
+    protected final RowType flussRowType;
+    private final FlussRowToIcebergFieldConverter[] fieldConverters;
+
+    public FlussRowAsIcebergRecord(Types.StructType structType, RowType 
flussRowType) {
+        this.structType = structType;
+        this.flussRowType = flussRowType;
+        fieldConverters = new 
FlussRowToIcebergFieldConverter[flussRowType.getFieldCount()];
+        for (int pos = 0; pos < flussRowType.getFieldCount(); pos++) {
+            DataType flussType = flussRowType.getTypeAt(pos);
+            fieldConverters[pos] = createTypeConverter(flussType, pos);
+        }
+    }
+
+    public FlussRowAsIcebergRecord(
+            Types.StructType structType, RowType flussRowType, InternalRow 
internalRow) {
+        this(structType, flussRowType);
+        this.internalRow = internalRow;
+    }
+
+    @Override
+    public Types.StructType struct() {
+        return structType;
+    }
+
+    @Override
+    public Object getField(String name) {
+        return get(structType.fields().indexOf(structType.field(name)));
+    }
+
+    @Override
+    public void setField(String name, Object value) {
+        throw new UnsupportedOperationException("method setField is not 
supported.");
+    }
+
+    @Override
+    public Object get(int pos) {
+        // handle normal columns
+        if (internalRow.isNullAt(pos)) {
+            return null;
+        }
+        return fieldConverters[pos].convert(internalRow);
+    }
+
+    @Override
+    public Record copy() {
+        throw new UnsupportedOperationException("method copy is not 
supported.");
+    }
+
+    @Override
+    public Record copy(Map<String, Object> overwriteValues) {
+        throw new UnsupportedOperationException("method copy is not 
supported.");
+    }
+
+    @Override
+    public int size() {
+        return structType.fields().size();
+    }
+
+    @Override
+    public <T> T get(int pos, Class<T> javaClass) {
+        Object value = get(pos);
+        if (value == null || javaClass.isInstance(value)) {
+            return javaClass.cast(value);
+        } else {
+            throw new IllegalStateException(
+                    "Not an instance of " + javaClass.getName() + ": " + 
value);
+        }
+    }
+
+    @Override
+    public <T> void set(int pos, T value) {
+        throw new UnsupportedOperationException("method set is not 
supported.");
+    }
+
+    private interface FlussRowToIcebergFieldConverter {
+        Object convert(InternalRow value);
+    }
+
+    private FlussRowToIcebergFieldConverter createTypeConverter(DataType 
flussType, int pos) {
+        if (flussType instanceof BooleanType) {
+            return row -> row.getBoolean(pos);
+        } else if (flussType instanceof TinyIntType) {
+            return row -> (int) row.getByte(pos);
+        } else if (flussType instanceof SmallIntType) {
+            return row -> (int) row.getShort(pos);
+        } else if (flussType instanceof IntType) {
+            return row -> row.getInt(pos);
+        } else if (flussType instanceof BigIntType) {
+            return row -> row.getLong(pos);
+        } else if (flussType instanceof FloatType) {
+            return row -> row.getFloat(pos);
+        } else if (flussType instanceof DoubleType) {
+            return row -> row.getDouble(pos);
+        } else if (flussType instanceof StringType) {
+            return row -> row.getString(pos).toString();
+        } else if (flussType instanceof CharType) {
+            CharType charType = (CharType) flussType;
+            return row -> row.getChar(pos, charType.getLength()).toString();
+        } else if (flussType instanceof BytesType || flussType instanceof 
BinaryType) {
+            return row -> ByteBuffer.wrap(row.getBytes(pos));
+        } else if (flussType instanceof DecimalType) {
+            DecimalType decimalType = (DecimalType) flussType;
+            return row ->
+                    row.getDecimal(pos, decimalType.getPrecision(), 
decimalType.getScale())
+                            .toBigDecimal();
+        } else if (flussType instanceof LocalZonedTimestampType) {
+            LocalZonedTimestampType ltzType = (LocalZonedTimestampType) 
flussType;
+            return row ->
+                    toIcebergTimestampLtz(
+                            row.getTimestampLtz(pos, 
ltzType.getPrecision()).toInstant());
+        } else if (flussType instanceof TimestampType) {
+            TimestampType tsType = (TimestampType) flussType;
+            return row -> row.getTimestampNtz(pos, 
tsType.getPrecision()).toLocalDateTime();
+        } else if (flussType instanceof DateType) {
+            return row -> DateTimeUtils.toLocalDate(row.getInt(pos));
+        } else if (flussType instanceof TimeType) {
+            return row -> DateTimeUtils.toLocalTime(row.getInt(pos));
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported data type conversion for Fluss type: "
+                            + flussType.getClass().getSimpleName());
+        }
+    }
+
+    private OffsetDateTime toIcebergTimestampLtz(Instant instant) {
+        return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSource.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSource.java
index 31eee6fb5..d40a8ad39 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSource.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSource.java
@@ -19,6 +19,7 @@
 package org.apache.fluss.lake.iceberg.source;
 
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.iceberg.utils.FlussToIcebergPredicateConverter;
 import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
 import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
 import org.apache.fluss.lake.source.LakeSource;
@@ -27,14 +28,18 @@ import org.apache.fluss.lake.source.RecordReader;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.predicate.Predicate;
 
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
 
@@ -44,6 +49,7 @@ public class IcebergLakeSource implements 
LakeSource<IcebergSplit> {
     private final Configuration icebergConfig;
     private final TablePath tablePath;
     private @Nullable int[][] project;
+    private @Nullable Expression filter;
 
     public IcebergLakeSource(Configuration icebergConfig, TablePath tablePath) 
{
         this.icebergConfig = icebergConfig;
@@ -62,13 +68,29 @@ public class IcebergLakeSource implements 
LakeSource<IcebergSplit> {
 
     @Override
     public FilterPushDownResult withFilters(List<Predicate> predicates) {
-        // TODO: Support filter push down. #1676
-        return FilterPushDownResult.of(Collections.emptyList(), predicates);
+        List<Predicate> unConsumedPredicates = new ArrayList<>();
+        List<Predicate> consumedPredicates = new ArrayList<>();
+        List<Expression> converted = new ArrayList<>();
+        Schema schema = getSchema(tablePath);
+        for (Predicate predicate : predicates) {
+            Optional<Expression> optPredicate =
+                    FlussToIcebergPredicateConverter.convert(schema, 
predicate);
+            if (optPredicate.isPresent()) {
+                consumedPredicates.add(predicate);
+                converted.add(optPredicate.get());
+            } else {
+                unConsumedPredicates.add(predicate);
+            }
+        }
+        if (!converted.isEmpty()) {
+            filter = converted.stream().reduce(Expressions::and).orElse(null);
+        }
+        return FilterPushDownResult.of(consumedPredicates, 
unConsumedPredicates);
     }
 
     @Override
     public Planner<IcebergSplit> createPlanner(PlannerContext context) throws 
IOException {
-        return new IcebergSplitPlanner(icebergConfig, tablePath, 
context.snapshotId());
+        return new IcebergSplitPlanner(icebergConfig, tablePath, 
context.snapshotId(), filter);
     }
 
     @Override
@@ -82,4 +104,9 @@ public class IcebergLakeSource implements 
LakeSource<IcebergSplit> {
     public SimpleVersionedSerializer<IcebergSplit> getSplitSerializer() {
         return new IcebergSplitSerializer();
     }
+
+    private Schema getSchema(TablePath tablePath) {
+        Catalog catalog = 
IcebergCatalogUtils.createIcebergCatalog(icebergConfig);
+        return catalog.loadTable(toIceberg(tablePath)).schema();
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReader.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReader.java
index d848ec152..f654fb5d0 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReader.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReader.java
@@ -45,7 +45,13 @@ import java.util.stream.IntStream;
 import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
 
-/** Iceberg record reader. */
+/**
+ * Iceberg record reader. The filter is applied during the plan phase of 
IcebergSplitPlanner, so the
+ * RecordReader does not need to apply the filter again.
+ *
+ * <p>Refer to {@link 
org.apache.iceberg.data.GenericReader#open(FileScanTask)} and {@link
+ * org.apache.iceberg.Scan#ignoreResiduals()} for details.
+ */
 public class IcebergRecordReader implements RecordReader {
     protected IcebergRecordAsFlussRecordIterator iterator;
     protected @Nullable int[][] project;
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java
index 89a5dc269..4376108ba 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java
@@ -27,9 +27,13 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
 import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -47,11 +51,14 @@ public class IcebergSplitPlanner implements 
Planner<IcebergSplit> {
     private final Configuration icebergConfig;
     private final TablePath tablePath;
     private final long snapshotId;
+    private final @Nullable Expression filter;
 
-    public IcebergSplitPlanner(Configuration icebergConfig, TablePath 
tablePath, long snapshotId) {
+    public IcebergSplitPlanner(
+            Configuration icebergConfig, TablePath tablePath, long snapshotId, 
Expression filter) {
         this.icebergConfig = icebergConfig;
         this.tablePath = tablePath;
         this.snapshotId = snapshotId;
+        this.filter = filter;
     }
 
     @Override
@@ -61,12 +68,11 @@ public class IcebergSplitPlanner implements 
Planner<IcebergSplit> {
         Table table = catalog.loadTable(toIceberg(tablePath));
         Function<FileScanTask, List<String>> partitionExtract = 
createPartitionExtractor(table);
         Function<FileScanTask, Integer> bucketExtractor = 
createBucketExtractor(table);
-        try (CloseableIterable<FileScanTask> tasks =
-                table.newScan()
-                        .useSnapshot(snapshotId)
-                        .includeColumnStats()
-                        .ignoreResiduals()
-                        .planFiles()) {
+        TableScan tableScan = 
table.newScan().useSnapshot(snapshotId).includeColumnStats();
+        if (filter != null) {
+            tableScan = tableScan.filter(filter);
+        }
+        try (CloseableIterable<FileScanTask> tasks = tableScan.planFiles()) {
             tasks.forEach(
                     task ->
                             splits.add(
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/FlussRecordAsIcebergRecord.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/FlussRecordAsIcebergRecord.java
index 0748c89c3..6c2296152 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/FlussRecordAsIcebergRecord.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/FlussRecordAsIcebergRecord.java
@@ -17,38 +17,21 @@
 
 package org.apache.fluss.lake.iceberg.tiering;
 
+import org.apache.fluss.lake.iceberg.source.FlussRowAsIcebergRecord;
 import org.apache.fluss.record.LogRecord;
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.types.BigIntType;
-import org.apache.fluss.types.BinaryType;
-import org.apache.fluss.types.BooleanType;
-import org.apache.fluss.types.BytesType;
-import org.apache.fluss.types.CharType;
-import org.apache.fluss.types.DataType;
-import org.apache.fluss.types.DateType;
-import org.apache.fluss.types.DecimalType;
-import org.apache.fluss.types.DoubleType;
-import org.apache.fluss.types.FloatType;
-import org.apache.fluss.types.IntType;
-import org.apache.fluss.types.LocalZonedTimestampType;
 import org.apache.fluss.types.RowType;
-import org.apache.fluss.types.SmallIntType;
-import org.apache.fluss.types.StringType;
-import org.apache.fluss.types.TimeType;
-import org.apache.fluss.types.TimestampType;
-import org.apache.fluss.types.TinyIntType;
-import org.apache.fluss.utils.DateTimeUtils;
 
-import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.types.Types;
 
-import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
-import java.util.Map;
 
+import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS;
+import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
 import static org.apache.fluss.utils.Preconditions.checkState;
 
 /**
@@ -57,24 +40,21 @@ import static 
org.apache.fluss.utils.Preconditions.checkState;
  * <p>todo: refactor to implement ParquetWriters, OrcWriters, AvroWriters just 
like Flink & Spark
  * write to iceberg for higher performance
  */
-public class FlussRecordAsIcebergRecord implements Record {
+public class FlussRecordAsIcebergRecord extends FlussRowAsIcebergRecord {
 
     // Lake table for iceberg will append three system columns: __bucket, 
__offset,__timestamp
-    private static final int LAKE_ICEBERG_SYSTEM_COLUMNS = 3;
+    private static final int LAKE_ICEBERG_SYSTEM_COLUMNS = 
SYSTEM_COLUMNS.size();
 
     private LogRecord logRecord;
     private final int bucket;
-    private final Schema icebergSchema;
-    private final RowType flussRowType;
 
     // the origin row fields in fluss, excluding the system columns in iceberg
     private int originRowFieldCount;
-    private InternalRow internalRow;
 
-    public FlussRecordAsIcebergRecord(int bucket, Schema icebergSchema, 
RowType flussRowType) {
+    public FlussRecordAsIcebergRecord(
+            int bucket, Types.StructType structType, RowType flussRowType) {
+        super(structType, flussRowType);
         this.bucket = bucket;
-        this.icebergSchema = icebergSchema;
-        this.flussRowType = flussRowType;
     }
 
     public void setFlussRecord(LogRecord logRecord) {
@@ -82,24 +62,25 @@ public class FlussRecordAsIcebergRecord implements Record {
         this.internalRow = logRecord.getRow();
         this.originRowFieldCount = internalRow.getFieldCount();
         checkState(
-                originRowFieldCount
-                        == icebergSchema.asStruct().fields().size() - 
LAKE_ICEBERG_SYSTEM_COLUMNS,
+                originRowFieldCount == structType.fields().size() - 
LAKE_ICEBERG_SYSTEM_COLUMNS,
                 "The Iceberg table fields count must equals to LogRecord's 
fields count.");
     }
 
-    @Override
-    public Types.StructType struct() {
-        return icebergSchema.asStruct();
-    }
-
     @Override
     public Object getField(String name) {
-        return icebergSchema;
-    }
-
-    @Override
-    public void setField(String name, Object value) {
-        throw new UnsupportedOperationException("method setField is not 
supported.");
+        if (SYSTEM_COLUMNS.containsKey(name)) {
+            switch (name) {
+                case BUCKET_COLUMN_NAME:
+                    return bucket;
+                case OFFSET_COLUMN_NAME:
+                    return logRecord.logOffset();
+                case TIMESTAMP_COLUMN_NAME:
+                    return toIcebergTimestampLtz(logRecord.timestamp());
+                default:
+                    throw new IllegalArgumentException("Unknown system column: 
" + name);
+            }
+        }
+        return super.getField(name);
     }
 
     @Override
@@ -113,103 +94,12 @@ public class FlussRecordAsIcebergRecord implements Record 
{
             return logRecord.logOffset();
         } else if (pos == originRowFieldCount + 2) {
             // timestamp column
-            return getTimestampLtz(logRecord.timestamp());
+            return toIcebergTimestampLtz(logRecord.timestamp());
         }
-
-        // handle normal columns
-        if (internalRow.isNullAt(pos)) {
-            return null;
-        }
-
-        DataType dataType = flussRowType.getTypeAt(pos);
-        if (dataType instanceof BooleanType) {
-            return internalRow.getBoolean(pos);
-        } else if (dataType instanceof TinyIntType) {
-            return (int) internalRow.getByte(pos);
-        } else if (dataType instanceof SmallIntType) {
-            return (int) internalRow.getShort(pos);
-        } else if (dataType instanceof IntType) {
-            return internalRow.getInt(pos);
-        } else if (dataType instanceof BigIntType) {
-            return internalRow.getLong(pos);
-        } else if (dataType instanceof FloatType) {
-            return internalRow.getFloat(pos);
-        } else if (dataType instanceof DoubleType) {
-            return internalRow.getDouble(pos);
-        } else if (dataType instanceof StringType) {
-            return internalRow.getString(pos).toString();
-        } else if (dataType instanceof CharType) {
-            CharType charType = (CharType) dataType;
-            return internalRow.getChar(pos, charType.getLength()).toString();
-        } else if (dataType instanceof BytesType) {
-            return ByteBuffer.wrap(internalRow.getBytes(pos));
-        } else if (dataType instanceof BinaryType) {
-            // Iceberg's Record interface expects ByteBuffer for binary types.
-            return ByteBuffer.wrap(internalRow.getBytes(pos));
-        } else if (dataType instanceof DecimalType) {
-            // Iceberg expects BigDecimal for decimal types.
-            DecimalType decimalType = (DecimalType) dataType;
-            return internalRow
-                    .getDecimal(pos, decimalType.getPrecision(), 
decimalType.getScale())
-                    .toBigDecimal();
-        } else if (dataType instanceof LocalZonedTimestampType) {
-            // Iceberg expects OffsetDateTime for timestamp with local 
timezone.
-            return getTimestampLtz(
-                    internalRow
-                            .getTimestampLtz(
-                                    pos, ((LocalZonedTimestampType) 
dataType).getPrecision())
-                            .toInstant());
-        } else if (dataType instanceof TimestampType) {
-            // Iceberg expects LocalDateType for timestamp without local 
timezone.
-            return internalRow
-                    .getTimestampNtz(pos, ((TimestampType) 
dataType).getPrecision())
-                    .toLocalDateTime();
-        } else if (dataType instanceof DateType) {
-            return DateTimeUtils.toLocalDate(internalRow.getInt(pos));
-        } else if (dataType instanceof TimeType) {
-            return DateTimeUtils.toLocalTime(internalRow.getInt(pos));
-        }
-        throw new UnsupportedOperationException(
-                "Unsupported data type conversion for Fluss type: "
-                        + dataType.getClass().getName());
+        return super.get(pos);
     }
 
-    private OffsetDateTime getTimestampLtz(long timestamp) {
+    private OffsetDateTime toIcebergTimestampLtz(long timestamp) {
         return OffsetDateTime.ofInstant(Instant.ofEpochMilli(timestamp), 
ZoneOffset.UTC);
     }
-
-    private OffsetDateTime getTimestampLtz(Instant instant) {
-        return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
-    }
-
-    @Override
-    public Record copy() {
-        throw new UnsupportedOperationException("method copy is not 
supported.");
-    }
-
-    @Override
-    public Record copy(Map<String, Object> overwriteValues) {
-        throw new UnsupportedOperationException("method copy is not 
supported.");
-    }
-
-    @Override
-    public int size() {
-        return icebergSchema.asStruct().fields().size();
-    }
-
-    @Override
-    public <T> T get(int pos, Class<T> javaClass) {
-        Object value = get(pos);
-        if (value == null || javaClass.isInstance(value)) {
-            return javaClass.cast(value);
-        } else {
-            throw new IllegalStateException(
-                    "Not an instance of " + javaClass.getName() + ": " + 
value);
-        }
-    }
-
-    @Override
-    public <T> void set(int pos, T value) {
-        throw new UnsupportedOperationException("method set is not 
supported.");
-    }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/RecordWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/RecordWriter.java
index 6a78cec15..238ef5be9 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/RecordWriter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/RecordWriter.java
@@ -44,7 +44,7 @@ public abstract class RecordWriter implements AutoCloseable {
         this.bucket = tableBucket.getBucket();
         this.flussRecordAsIcebergRecord =
                 new FlussRecordAsIcebergRecord(
-                        tableBucket.getBucket(), icebergSchema, flussRowType);
+                        tableBucket.getBucket(), icebergSchema.asStruct(), 
flussRowType);
     }
 
     public abstract void write(LogRecord record) throws Exception;
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/FlussToIcebergPredicateConverter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/FlussToIcebergPredicateConverter.java
new file mode 100644
index 000000000..77d50bb96
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/FlussToIcebergPredicateConverter.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.utils;
+
+import org.apache.fluss.predicate.And;
+import org.apache.fluss.predicate.CompoundPredicate;
+import org.apache.fluss.predicate.FieldRef;
+import org.apache.fluss.predicate.FunctionVisitor;
+import org.apache.fluss.predicate.LeafPredicate;
+import org.apache.fluss.predicate.Or;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.predicate.PredicateVisitor;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.types.Types;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIcebergLiteral;
+
+/**
+ * Converts a Fluss {@link org.apache.fluss.predicate.Predicate} into an 
Iceberg {@link Expression}.
+ *
+ * <p>This class implements the {@link PredicateVisitor} pattern to traverse a 
tree of Fluss
+ * predicates. It handles both leaf-level conditions (like equals, greater 
than) and compound
+ * conditions (AND, OR).
+ */
+public class FlussToIcebergPredicateConverter implements 
PredicateVisitor<Expression> {
+
+    private final Schema icebergSchema;
+    private final LeafFunctionConverter converter = new 
LeafFunctionConverter();
+
+    public FlussToIcebergPredicateConverter(Schema schema) {
+        this.icebergSchema = schema;
+    }
+
+    public static Optional<Expression> convert(Schema schema, Predicate 
flussPredicate) {
+        try {
+            return Optional.of(flussPredicate.visit(new 
FlussToIcebergPredicateConverter(schema)));
+        } catch (UnsupportedOperationException e) {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public Expression visit(LeafPredicate predicate) {
+        // Delegate the conversion of the specific function to a dedicated 
visitor.
+        // This avoids a long chain of 'if-instanceof' checks.
+        return predicate.visit(converter);
+    }
+
+    @Override
+    public Expression visit(CompoundPredicate predicate) {
+        List<Expression> children =
+                predicate.children().stream().map(p -> 
p.visit(this)).collect(Collectors.toList());
+
+        CompoundPredicate.Function function = predicate.function();
+        if (function instanceof And) {
+            return 
children.stream().reduce(Expressions::and).orElse(Expressions.alwaysTrue());
+        } else if (function instanceof Or) {
+            return 
children.stream().reduce(Expressions::or).orElse(Expressions.alwaysTrue());
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported fluss compound predicate function: " + 
predicate.function());
+        }
+    }
+
+    /**
+     * A visitor that implements the logic to convert each type of {@link
+     * org.apache.fluss.predicate.LeafFunction} to an Iceberg {@link 
Expression}.
+     */
+    private class LeafFunctionConverter implements FunctionVisitor<Expression> 
{
+
+        @Override
+        public Expression visitIsNotNull(FieldRef fieldRef) {
+            String fieldName = getField(fieldRef.index()).name();
+            return Expressions.notNull(fieldName);
+        }
+
+        @Override
+        public Expression visitIsNull(FieldRef fieldRef) {
+            String fieldName = getField(fieldRef.index()).name();
+            return Expressions.isNull(fieldName);
+        }
+
+        @Override
+        public Expression visitStartsWith(FieldRef fieldRef, Object literal) {
+            String fieldName = getField(fieldRef.index()).name();
+            return Expressions.startsWith(
+                    fieldName, convertToIcebergLiteral(fieldRef.index(), 
literal).toString());
+        }
+
+        @Override
+        public Expression visitEndsWith(FieldRef fieldRef, Object literal) {
+            // iceberg not support endswith filter
+            throw new UnsupportedOperationException("Iceberg not supported 
endswith filter.");
+        }
+
+        @Override
+        public Expression visitContains(FieldRef fieldRef, Object literal) {
+            // iceberg not support contains filter
+            throw new UnsupportedOperationException("Iceberg not supported 
contains filter.");
+        }
+
+        @Override
+        public Expression visitLessThan(FieldRef fieldRef, Object literal) {
+            String fieldName = getField(fieldRef.index()).name();
+            return Expressions.lessThan(
+                    fieldName, convertToIcebergLiteral(fieldRef.index(), 
literal));
+        }
+
+        @Override
+        public Expression visitGreaterOrEqual(FieldRef fieldRef, Object 
literal) {
+            String fieldName = getField(fieldRef.index()).name();
+            return Expressions.greaterThanOrEqual(
+                    fieldName, convertToIcebergLiteral(fieldRef.index(), 
literal));
+        }
+
+        @Override
+        public Expression visitNotEqual(FieldRef fieldRef, Object literal) {
+            String fieldName = getField(fieldRef.index()).name();
+            return Expressions.notEqual(
+                    fieldName, convertToIcebergLiteral(fieldRef.index(), 
literal));
+        }
+
+        @Override
+        public Expression visitLessOrEqual(FieldRef fieldRef, Object literal) {
+            String fieldName = getField(fieldRef.index()).name();
+            return Expressions.lessThanOrEqual(
+                    fieldName, convertToIcebergLiteral(fieldRef.index(), 
literal));
+        }
+
+        @Override
+        public Expression visitEqual(FieldRef fieldRef, Object literal) {
+            String fieldName = getField(fieldRef.index()).name();
+            return Expressions.equal(fieldName, 
convertToIcebergLiteral(fieldRef.index(), literal));
+        }
+
+        @Override
+        public Expression visitGreaterThan(FieldRef fieldRef, Object literal) {
+            String fieldName = getField(fieldRef.index()).name();
+            Object icebergLiteral = convertToIcebergLiteral(fieldRef.index(), 
literal);
+            return Expressions.greaterThan(fieldName, icebergLiteral);
+        }
+
+        @Override
+        public Expression visitIn(FieldRef fieldRef, List<Object> literals) {
+            String fieldName = getField(fieldRef.index()).name();
+            List<Object> icebergLiterals =
+                    literals.stream()
+                            .map(literal -> 
convertToIcebergLiteral(fieldRef.index(), literal))
+                            .collect(Collectors.toList());
+            return Expressions.in(fieldName, icebergLiterals);
+        }
+
+        @Override
+        public Expression visitNotIn(FieldRef fieldRef, List<Object> literals) 
{
+            String fieldName = getField(fieldRef.index()).name();
+            List<Object> icebergLiterals =
+                    literals.stream()
+                            .map(literal -> 
convertToIcebergLiteral(fieldRef.index(), literal))
+                            .collect(Collectors.toList());
+            return Expressions.notIn(fieldName, icebergLiterals);
+        }
+
+        @Override
+        public Expression visitAnd(List<Expression> children) {
+            // shouldn't come to here
+            throw new UnsupportedOperationException("Unsupported visitAnd 
method.");
+        }
+
+        @Override
+        public Expression visitOr(List<Expression> children) {
+            // shouldn't come to here
+            throw new UnsupportedOperationException("Unsupported visitOr 
method.");
+        }
+
+        private Types.NestedField getField(int fieldIndex) {
+            return icebergSchema.columns().get(fieldIndex);
+        }
+
+        private Object convertToIcebergLiteral(int fieldIndex, Object 
flussLiteral) {
+            return toIcebergLiteral(getField(fieldIndex), flussLiteral);
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
index f12b505b1..4d7582c53 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
@@ -17,7 +17,13 @@
 
 package org.apache.fluss.lake.iceberg.utils;
 
+import org.apache.fluss.lake.iceberg.source.FlussRowAsIcebergRecord;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataType;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
 
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionKey;
@@ -27,6 +33,8 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 
 import javax.annotation.Nullable;
 
@@ -82,4 +90,44 @@ public class IcebergConversions {
         expression = Expressions.and(expression, 
Expressions.equal(BUCKET_COLUMN_NAME, bucket));
         return expression;
     }
+
+    public static Object toIcebergLiteral(Types.NestedField field, Object 
flussLiteral) {
+        InternalRow flussRow = GenericRow.of(flussLiteral);
+        FlussRowAsIcebergRecord flussRowAsIcebergRecord =
+                new FlussRowAsIcebergRecord(
+                        Types.StructType.of(field),
+                        
RowType.of(convertIcebergTypeToFlussType(field.type())),
+                        flussRow);
+        return flussRowAsIcebergRecord.get(0, 
field.type().typeId().javaClass());
+    }
+
+    /** Converts Iceberg data types to Fluss data types. */
+    private static DataType convertIcebergTypeToFlussType(Type icebergType) {
+        if (icebergType instanceof Types.BooleanType) {
+            return DataTypes.BOOLEAN();
+        } else if (icebergType instanceof Types.IntegerType) {
+            return DataTypes.INT();
+        } else if (icebergType instanceof Types.LongType) {
+            return DataTypes.BIGINT();
+        } else if (icebergType instanceof Types.DoubleType) {
+            return DataTypes.DOUBLE();
+        } else if (icebergType instanceof Types.TimeType) {
+            return DataTypes.TIME();
+        } else if (icebergType instanceof Types.TimestampType) {
+            Types.TimestampType timestampType = (Types.TimestampType) 
icebergType;
+            if (timestampType.shouldAdjustToUTC()) {
+                return DataTypes.TIMESTAMP_LTZ();
+            } else {
+                return DataTypes.TIMESTAMP();
+            }
+        } else if (icebergType instanceof Types.StringType) {
+            return DataTypes.STRING();
+        } else if (icebergType instanceof Types.DecimalType) {
+            Types.DecimalType decimalType = (Types.DecimalType) icebergType;
+            return DataTypes.DECIMAL(decimalType.precision(), 
decimalType.scale());
+        }
+        throw new UnsupportedOperationException(
+                "Unsupported data type conversion for Iceberg type: "
+                        + icebergType.getClass().getName());
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
index 06dd90eec..168f51cd2 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
@@ -123,10 +123,11 @@ public class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
             String plan = batchTEnv.explainSql(sqlWithPartitionFilter);
 
             // check if the plan contains partition filter
-            // TODO: push down iceberg partition filter
+            // check filter push down
             assertThat(plan)
                     .contains("TableSourceScan(")
-                    .contains("LogicalFilter(condition=[=($15, _UTF-16LE'" + 
partition + "'");
+                    .contains("LogicalFilter(condition=[=($15, _UTF-16LE'" + 
partition + "'")
+                    .contains("filter=[=(p, _UTF-16LE'" + partition + "'");
 
             List<Row> expectedFiltered =
                     writtenRows.stream()
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java
similarity index 57%
copy from 
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java
copy to 
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java
index 55dedbeb0..0366ee62d 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSourceTest.java
@@ -16,19 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.lake.paimon.source;
+package org.apache.fluss.lake.iceberg.source;
 
 import org.apache.fluss.lake.source.LakeSource;
 import org.apache.fluss.lake.source.RecordReader;
 import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.predicate.FieldRef;
-import org.apache.fluss.predicate.FunctionVisitor;
-import org.apache.fluss.predicate.LeafFunction;
-import org.apache.fluss.predicate.LeafPredicate;
 import org.apache.fluss.predicate.Predicate;
 import org.apache.fluss.predicate.PredicateBuilder;
 import org.apache.fluss.record.LogRecord;
-import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.types.IntType;
 import org.apache.fluss.types.RowType;
@@ -36,75 +31,82 @@ import org.apache.fluss.types.StringType;
 import org.apache.fluss.utils.CloseableIterator;
 
 import org.apache.flink.types.Row;
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.schema.Schema;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** UT for {@link PaimonLakeSource}. */
-class PaimonLakeSourceTest extends PaimonSourceTestBase {
+/** Test filter push down in {@link IcebergLakeSource}. */
+class IcebergLakeSourceTest extends IcebergSourceTestBase {
 
     private static final Schema SCHEMA =
-            Schema.newBuilder()
-                    .column("id", org.apache.paimon.types.DataTypes.INT())
-                    .column("name", org.apache.paimon.types.DataTypes.STRING())
-                    .column("__bucket", 
org.apache.paimon.types.DataTypes.INT())
-                    .column("__offset", 
org.apache.paimon.types.DataTypes.BIGINT())
-                    .column("__timestamp", 
org.apache.paimon.types.DataTypes.TIMESTAMP(6))
-                    .primaryKey("id")
-                    .option(CoreOptions.BUCKET.key(), "1")
-                    .build();
+            new Schema(
+                    required(1, "id", Types.IntegerType.get()),
+                    optional(2, "name", Types.StringType.get()),
+                    required(3, "__bucket", Types.IntegerType.get()),
+                    required(4, "__offset", Types.LongType.get()),
+                    required(5, "__timestamp", 
Types.TimestampType.withZone()));
+
+    private static final PartitionSpec PARTITION_SPEC =
+            PartitionSpec.builderFor(SCHEMA).bucket("id", 
DEFAULT_BUCKET_NUM).build();
 
     private static final PredicateBuilder FLUSS_BUILDER =
             new PredicateBuilder(RowType.of(DataTypes.BIGINT(), 
DataTypes.STRING()));
 
     @BeforeAll
     protected static void beforeAll() {
-        PaimonSourceTestBase.beforeAll();
+        IcebergSourceTestBase.beforeAll();
     }
 
     @Test
     void testWithFilters() throws Exception {
         TablePath tablePath = TablePath.of("fluss", "test_filters");
-        createTable(tablePath, SCHEMA);
+        createTable(tablePath, SCHEMA, PARTITION_SPEC);
 
         // write some rows
-        List<InternalRow> rows = new ArrayList<>();
+        Table table = getTable(tablePath);
+        List<Record> rows = new ArrayList<>();
         for (int i = 1; i <= 4; i++) {
             rows.add(
-                    GenericRow.of(
+                    createIcebergRecord(
+                            SCHEMA,
                             i,
-                            BinaryString.fromString("name" + i),
+                            "name" + i,
                             0,
                             (long) i,
-                            
Timestamp.fromEpochMillis(System.currentTimeMillis())));
+                            OffsetDateTime.now(ZoneOffset.UTC)));
         }
-        writeRecord(tablePath, rows);
+        writeRecord(table, rows, null, 0);
 
         // write some rows again
+        table.refresh();
         rows = new ArrayList<>();
-        for (int i = 10; i <= 14; i++) {
+        for (int i = 14; i <= 16; i++) {
             rows.add(
-                    GenericRow.of(
+                    createIcebergRecord(
+                            SCHEMA,
                             i,
-                            BinaryString.fromString("name" + i),
+                            "name" + i,
                             0,
                             (long) i,
-                            
Timestamp.fromEpochMillis(System.currentTimeMillis())));
+                            OffsetDateTime.now(ZoneOffset.UTC)));
         }
-        writeRecord(tablePath, rows);
+        writeRecord(table, rows, null, 0);
+        table.refresh();
 
         // test all filter can be accepted
         Predicate filter1 = FLUSS_BUILDER.greaterOrEqual(0, 2);
@@ -113,25 +115,23 @@ class PaimonLakeSourceTest extends PaimonSourceTestBase {
                 FLUSS_BUILDER.startsWith(1, 
org.apache.fluss.row.BinaryString.fromString("name"));
         List<Predicate> allFilters = Arrays.asList(filter1, filter2, filter3);
 
-        LakeSource<PaimonSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
+        LakeSource<IcebergSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
         LakeSource.FilterPushDownResult filterPushDownResult = 
lakeSource.withFilters(allFilters);
         
assertThat(filterPushDownResult.acceptedPredicates()).isEqualTo(allFilters);
         assertThat(filterPushDownResult.remainingPredicates()).isEmpty();
 
         // read data to verify the filters work
-        List<PaimonSplit> paimonSplits = lakeSource.createPlanner(() -> 
2).plan();
-        assertThat(paimonSplits).hasSize(1);
-        PaimonSplit paimonSplit = paimonSplits.get(0);
-        // make sure we only have one data file after filter to check plan 
will make use
-        // of filters
-        assertThat(paimonSplit.dataSplit().dataFiles()).hasSize(1);
-
-        // read data with filter to mae sure the reader with filter works 
properly
+        List<IcebergSplit> icebergSplits =
+                lakeSource.createPlanner(() -> 
table.currentSnapshot().snapshotId()).plan();
+        assertThat(icebergSplits).hasSize(1);
+        IcebergSplit icebergSplit = icebergSplits.get(0);
+
+        // read data with filter to make sure the reader with filter works 
properly
         List<Row> actual = new ArrayList<>();
         org.apache.fluss.row.InternalRow.FieldGetter[] fieldGetters =
                 org.apache.fluss.row.InternalRow.createFieldGetters(
                         RowType.of(new IntType(), new StringType()));
-        RecordReader recordReader = lakeSource.createRecordReader(() -> 
paimonSplit);
+        RecordReader recordReader = lakeSource.createRecordReader(() -> 
icebergSplit);
         try (CloseableIterator<LogRecord> iterator = recordReader.read()) {
             actual.addAll(
                     convertToFlinkRow(
@@ -142,12 +142,7 @@ class PaimonLakeSourceTest extends PaimonSourceTestBase {
 
         // test mix one unaccepted filter
         Predicate nonConvertibleFilter =
-                new LeafPredicate(
-                        new UnSupportFilterFunction(),
-                        DataTypes.INT(),
-                        0,
-                        "f1",
-                        Collections.emptyList());
+                FLUSS_BUILDER.endsWith(1, 
org.apache.fluss.row.BinaryString.fromString("name"));
         allFilters = Arrays.asList(nonConvertibleFilter, filter1, filter2);
 
         filterPushDownResult = lakeSource.withFilters(allFilters);
@@ -163,34 +158,4 @@ class PaimonLakeSourceTest extends PaimonSourceTestBase {
         assertThat(filterPushDownResult.remainingPredicates().toString())
                 .isEqualTo(allFilters.toString());
     }
-
-    private static class UnSupportFilterFunction extends LeafFunction {
-
-        @Override
-        public boolean test(DataType type, Object field, List<Object> 
literals) {
-            return false;
-        }
-
-        @Override
-        public boolean test(
-                DataType type,
-                long rowCount,
-                Object min,
-                Object max,
-                Long nullCount,
-                List<Object> literals) {
-            return false;
-        }
-
-        @Override
-        public Optional<LeafFunction> negate() {
-            return Optional.empty();
-        }
-
-        @Override
-        public <T> T visit(FunctionVisitor<T> visitor, FieldRef fieldRef, 
List<Object> literals) {
-            throw new UnsupportedOperationException(
-                    "Unsupported filter function for test purpose.");
-        }
-    }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/FlussToIcebergPredicateConverterTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/FlussToIcebergPredicateConverterTest.java
new file mode 100644
index 000000000..1e8bff09d
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/FlussToIcebergPredicateConverterTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.utils;
+
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.predicate.PredicateBuilder;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.types.RowType;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.fluss.row.BinaryString.fromString;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for {@link FlussToIcebergPredicateConverter}. */
+class FlussToIcebergPredicateConverterTest {
+
+    private static final PredicateBuilder FLUSS_BUILDER =
+            new PredicateBuilder(
+                    RowType.builder()
+                            .field("f1", DataTypes.BIGINT())
+                            .field("f2", DataTypes.DOUBLE())
+                            .field("f3", DataTypes.STRING())
+                            .build());
+
+    private static final Schema ICEBERG_SCHEMA =
+            new Schema(
+                    required(1, "f1", Types.LongType.get()),
+                    optional(2, "f2", Types.DoubleType.get()),
+                    required(3, "f3", Types.StringType.get()));
+
+    public static Stream<Arguments> parameters() {
+        // A comprehensive set of test cases for different predicate types.
+        return Stream.of(
+                // Leaf Predicates
+                Arguments.of(FLUSS_BUILDER.equal(0, 12L), 
Expressions.equal("f1", 12L)),
+                Arguments.of(
+                        FLUSS_BUILDER.notEqual(2, fromString("test")),
+                        Expressions.notEqual("f3", "test")),
+                Arguments.of(
+                        FLUSS_BUILDER.greaterThan(1, 99.9d), 
Expressions.greaterThan("f2", 99.9d)),
+                Arguments.of(
+                        FLUSS_BUILDER.greaterOrEqual(0, 100L),
+                        Expressions.greaterThanOrEqual("f1", 100L)),
+                Arguments.of(FLUSS_BUILDER.lessThan(1, 0.1d), 
Expressions.lessThan("f2", 0.1d)),
+                Arguments.of(
+                        FLUSS_BUILDER.lessOrEqual(0, 50L), 
Expressions.lessThanOrEqual("f1", 50L)),
+                Arguments.of(FLUSS_BUILDER.isNull(2), 
Expressions.isNull("f3")),
+                Arguments.of(FLUSS_BUILDER.isNotNull(1), 
Expressions.notNull("f2")),
+                Arguments.of(
+                        FLUSS_BUILDER.in(
+                                2,
+                                Stream.of("a", "b", "c")
+                                        .map(BinaryString::fromString)
+                                        .collect(Collectors.toList())),
+                        Stream.of("a", "b", "c")
+                                .map(s -> (Expression) Expressions.equal("f3", 
s))
+                                .reduce(Expressions::or)
+                                .get()),
+                Arguments.of(
+                        FLUSS_BUILDER.in(
+                                2,
+                                Stream.of(
+                                                "a", "b", "c", "a", "b", "c", 
"a", "b", "c", "a",
+                                                "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b",
+                                                "c", "a", "b", "c")
+                                        .map(BinaryString::fromString)
+                                        .collect(Collectors.toList())),
+                        Expressions.in(
+                                "f3",
+                                Arrays.asList(
+                                        "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b", "c",
+                                        "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b",
+                                        "c"))),
+                Arguments.of(
+                        FLUSS_BUILDER.notIn(
+                                2,
+                                Stream.of(
+                                                "a", "b", "c", "a", "b", "c", 
"a", "b", "c", "a",
+                                                "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b",
+                                                "c", "a", "b", "c")
+                                        .map(BinaryString::fromString)
+                                        .collect(Collectors.toList())),
+                        Expressions.notIn(
+                                "f3",
+                                Arrays.asList(
+                                        "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b", "c",
+                                        "a", "b", "c", "a", "b", "c", "a", 
"b", "c", "a", "b",
+                                        "c"))),
+                Arguments.of(
+                        FLUSS_BUILDER.startsWith(2, fromString("start")),
+                        Expressions.startsWith("f3", "start")),
+
+                // Compound Predicates
+                Arguments.of(
+                        PredicateBuilder.and(
+                                FLUSS_BUILDER.equal(0, 1L), 
FLUSS_BUILDER.isNotNull(2)),
+                        Expressions.and(Expressions.equal("f1", 1L), 
Expressions.notNull("f3"))),
+                Arguments.of(
+                        PredicateBuilder.or(
+                                FLUSS_BUILDER.lessThan(1, 10.0),
+                                FLUSS_BUILDER.greaterThan(1, 100.0)),
+                        Expressions.or(
+                                Expressions.lessThan("f2", 10.0),
+                                Expressions.greaterThan("f2", 100.0))),
+
+                // Nested Predicate
+                Arguments.of(
+                        PredicateBuilder.and(
+                                FLUSS_BUILDER.equal(2, fromString("test")),
+                                PredicateBuilder.or(
+                                        FLUSS_BUILDER.equal(0, 1L),
+                                        FLUSS_BUILDER.greaterThan(1, 50.0))),
+                        Expressions.and(
+                                Expressions.equal("f3", "test"),
+                                Expressions.or(
+                                        Expressions.equal("f1", 1L),
+                                        Expressions.greaterThan("f2", 
50.0)))));
+    }
+
+    @ParameterizedTest
+    @MethodSource("parameters")
+    void testPredicateConverter(Predicate flussPredicate, Expression 
expectedPredicate) {
+        Expression convertedIcebergExpression =
+                FlussToIcebergPredicateConverter.convert(ICEBERG_SCHEMA, 
flussPredicate).get();
+        
assertThat(convertedIcebergExpression.toString()).isEqualTo(expectedPredicate.toString());
+    }
+
+    public static Stream<Arguments> parametersNotSupported() {
+        return Stream.of(
+                Arguments.of(FLUSS_BUILDER.endsWith(2, fromString("end"))),
+                Arguments.of(FLUSS_BUILDER.contains(2, fromString("mid"))));
+    }
+
+    @ParameterizedTest
+    @MethodSource("parametersNotSupported")
+    void testNotSupportedPredicateConverter(Predicate flussPredicate) {
+        assertThat(FlussToIcebergPredicateConverter.convert(ICEBERG_SCHEMA, 
flussPredicate))
+                .isEqualTo(Optional.empty());
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java
index 084876475..512024807 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java
@@ -42,7 +42,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** UT for {@link IcebergConversions}. */
 class IcebergConversionsTest {
-    ;
 
     @Test
     void testToPartition(@TempDir File tempWarehouseDir) {
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonLakeSource.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonLakeSource.java
index 979356fa2..5d3e9dc36 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonLakeSource.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonLakeSource.java
@@ -77,9 +77,10 @@ public class PaimonLakeSource implements 
LakeSource<PaimonSplit> {
         List<Predicate> unConsumedPredicates = new ArrayList<>();
         List<Predicate> consumedPredicates = new ArrayList<>();
         List<org.apache.paimon.predicate.Predicate> converted = new 
ArrayList<>();
+        RowType rowType = getRowType(tablePath);
         for (Predicate predicate : predicates) {
             Optional<org.apache.paimon.predicate.Predicate> optPredicate =
-                    
FlussToPaimonPredicateConverter.convert(getRowType(tablePath), predicate);
+                    FlussToPaimonPredicateConverter.convert(rowType, 
predicate);
             if (optPredicate.isPresent()) {
                 consumedPredicates.add(predicate);
                 converted.add(optPredicate.get());
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java
index 55dedbeb0..0d184fc3c 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonLakeSourceTest.java
@@ -126,7 +126,7 @@ class PaimonLakeSourceTest extends PaimonSourceTestBase {
         // of filters
         assertThat(paimonSplit.dataSplit().dataFiles()).hasSize(1);
 
-        // read data with filter to mae sure the reader with filter works 
properly
+        // read data with filter to make sure the reader with filter works 
properly
         List<Row> actual = new ArrayList<>();
         org.apache.fluss.row.InternalRow.FieldGetter[] fieldGetters =
                 org.apache.fluss.row.InternalRow.createFieldGetters(

Reply via email to