luoyuxia commented on code in PR #1684:
URL: https://github.com/apache/fluss/pull/1684#discussion_r2343427380


##########
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSource.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.iceberg.conf.IcebergConfiguration;
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.source.Planner;
+import org.apache.fluss.lake.source.RecordReader;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.predicate.Predicate;
+
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.fluss.lake.iceberg.IcebergLakeCatalog.ICEBERG_CATALOG_DEFAULT_NAME;
+import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
+
+/** Iceberg lake source. */
+public class IcebergLakeSource implements LakeSource<IcebergSplit> {
+    private final Configuration icebergConfig;

Review Comment:
   nit:
   ```
   private static final long serialVersionUID = 1L;
   ```



##########
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReader.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.RecordReader;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.GenericRecord;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.ProjectedRow;
+import org.apache.fluss.utils.CloseableIterator;
+
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.IcebergGenericReader;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Types;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.temporal.ChronoField;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+
+/** Iceberg record reader. */
+public class IcebergRecordReader implements RecordReader {
+    protected IcebergRecordAsFlussRecordIterator iterator;
+    protected @Nullable int[][] project;
+    protected Types.StructType struct;
+
+    public IcebergRecordReader(FileScanTask fileScanTask, Table table, 
@Nullable int[][] project) {
+        TableScan tableScan = table.newScan();
+        if (project != null) {
+            tableScan = applyProject(tableScan, project);
+        }
+        IcebergGenericReader reader = new IcebergGenericReader(tableScan, 
true);
+        struct = tableScan.schema().asStruct();
+        this.iterator = new 
IcebergRecordAsFlussRecordIterator(reader.open(fileScanTask), struct);
+    }
+
+    @Override
+    public CloseableIterator<LogRecord> read() throws IOException {
+        return iterator;
+    }
+
+    private TableScan applyProject(TableScan tableScan, int[][] projects) {
+        Types.StructType structType = tableScan.schema().asStruct();
+        List<Types.NestedField> cols = new ArrayList<>(projects.length + 2);
+
+        for (int[] project : projects) {
+            // iceberg field index starts from 1
+            cols.add(structType.field(project[0] + 1));
+        }
+
+        cols.add(structType.field(OFFSET_COLUMN_NAME));
+        cols.add(structType.field(TIMESTAMP_COLUMN_NAME));
+        return tableScan.project(new Schema(cols));
+    }
+
+    /** Iterator for iceberg record as fluss record. */
+    public static class IcebergRecordAsFlussRecordIterator implements 
CloseableIterator<LogRecord> {
+
+        private final org.apache.iceberg.io.CloseableIterator<Record> 
icebergRecordIterator;
+
+        private final ProjectedRow projectedRow;
+        private final IcebergRecordAsFlussRow icebergRecordAsFlussRow;
+
+        private final int logOffsetColIndex;
+        private final int timestampColIndex;
+
+        public IcebergRecordAsFlussRecordIterator(
+                CloseableIterable<Record> icebergRecordIterator, 
Types.StructType struct) {
+            this.icebergRecordIterator = icebergRecordIterator.iterator();
+            this.logOffsetColIndex = 
struct.fields().indexOf(struct.field(OFFSET_COLUMN_NAME));
+            this.timestampColIndex = 
struct.fields().indexOf(struct.field(TIMESTAMP_COLUMN_NAME));
+
+            int[] project = IntStream.range(0, struct.fields().size() - 
2).toArray();
+            projectedRow = ProjectedRow.from(project);
+            icebergRecordAsFlussRow = new IcebergRecordAsFlussRow();
+        }
+
+        @Override
+        public void close() {
+            try {
+                icebergRecordIterator.close();
+            } catch (Exception e) {
+                throw new RuntimeException("Fail to close iterator.", e);
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return icebergRecordIterator.hasNext();
+        }
+
+        @Override
+        public LogRecord next() {
+            Record icebergRecord = icebergRecordIterator.next();
+            long offset = icebergRecord.get(logOffsetColIndex, Long.class);
+            long timestamp =
+                    icebergRecord
+                            .get(timestampColIndex, OffsetDateTime.class)
+                            .getLong(ChronoField.MILLI_OF_SECOND);

Review Comment:
   should use 
   ```
   OffsetDateTime offsetDateTime =
                       icebergRecord.get(timestampColIndex, 
OffsetDateTime.class);
               long timestamp =
                       offsetDateTime.getLong(ChronoField.INSTANT_SECONDS) * 
1000
                               + 
offsetDateTime.getLong(ChronoField.MILLI_OF_SECOND);
   ```
   MILLI_OF_SECOND just return `0 to 999` as the java doc said:
   ```
   This counts the millisecond within the second, from 0 to 999. This field has 
the same meaning for all calendar systems.
   ```



##########
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java:
##########
@@ -45,6 +45,11 @@ public void setIcebergRecord(Record icebergRecord) {
         this.icebergRecord = icebergRecord;
     }
 
+    public IcebergRecordAsFlussRow replaceRow(Record icebergRecord) {

Review Comment:
   ```suggestion
       public IcebergRecordAsFlussRow replaceIcebergRecord(Record 
icebergRecord) {
   ```
   Then we can remove `setIcebergRecord` method



##########
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplit.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.LakeSplit;
+
+import org.apache.iceberg.FileScanTask;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** Split for Iceberg table. */
+public class IcebergSplit implements LakeSplit, Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final FileScanTask fileScanTask;
+    private final int bucket;
+    private final List<String> partition;
+
+    public IcebergSplit(FileScanTask fileScanTask, int bucket, List<String> 
partition) {
+        this.fileScanTask = fileScanTask;
+        this.bucket = bucket;
+        this.partition = partition;
+    }
+
+    @Override
+    public int bucket() {
+        return bucket;

Review Comment:
   an performance improve, if it's not bucket awared - fluss log table, we can 
just return -1 which will improve performance.  see 
https://github.com/apache/fluss/issues/1678
   We can have another pr to do the improvoment if you want. But remember to 
left a todo to mark it .



##########
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.iceberg.conf.IcebergConfiguration;
+import org.apache.fluss.lake.source.Planner;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.transforms.TransformUtils;
+import org.apache.iceberg.types.Types;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.fluss.lake.iceberg.IcebergLakeCatalog.ICEBERG_CATALOG_DEFAULT_NAME;
+import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
+
+/** Iceberg split planner. */
+public class IcebergSplitPlanner implements Planner<IcebergSplit> {
+
+    private final Configuration icebergConfig;
+    private final TablePath tablePath;
+    private final long snapshotId;
+
+    public IcebergSplitPlanner(Configuration icebergConfig, TablePath 
tablePath, long snapshotId) {
+        this.icebergConfig = icebergConfig;
+        this.tablePath = tablePath;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public List<IcebergSplit> plan() throws IOException {
+        List<IcebergSplit> splits = new ArrayList<>();
+        Catalog catalog = createIcebergCatalog(icebergConfig);
+        Table table = catalog.loadTable(toIceberg(tablePath));
+        Function<FileScanTask, Integer> bucketExtract = 
createBucketExtractor(table);
+        Function<FileScanTask, List<String>> partitionExtract = 
createPartitionExtractor(table);
+        try (CloseableIterable<FileScanTask> tasks =
+                table.newScan()
+                        .useSnapshot(snapshotId)
+                        .includeColumnStats()
+                        .ignoreResiduals()
+                        .planFiles()) {
+            tasks.forEach(
+                    task ->
+                            splits.add(
+                                    new IcebergSplit(
+                                            task,
+                                            bucketExtract.apply(task),
+                                            partitionExtract.apply(task))));
+        }
+        return splits;
+    }
+
+    private Function<FileScanTask, Integer> createBucketExtractor(Table table) 
{
+        PartitionSpec partitionSpec = table.spec();
+        List<PartitionField> partitionFields = partitionSpec.fields();
+
+        List<PartitionField> bucketFields =
+                partitionFields.stream()
+                        .filter(field -> 
TransformUtils.isBucketTransform(field.transform()))
+                        .collect(Collectors.toList());
+
+        if (bucketFields.size() != 1) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Only one bucket key is supported for Iceberg at 
the moment, but found %d",
+                            bucketFields.size()));
+        }
+
+        PartitionField bucketField = bucketFields.get(0);
+        Types.StructType partitionType = partitionSpec.partitionType();
+        int bucketFieldIndex =
+                
partitionType.fields().indexOf(partitionType.field(bucketField.fieldId()));
+
+        return task -> task.file().partition().get(bucketFieldIndex, 
Integer.class);
+    }
+
+    private Function<FileScanTask, List<String>> 
createPartitionExtractor(Table table) {
+        PartitionSpec partitionSpec = table.spec();
+        List<PartitionField> partitionFields = partitionSpec.fields();
+        Types.StructType partitionType = partitionSpec.partitionType();
+
+        List<Integer> nonBucketFieldIndices =
+                partitionFields.stream()
+                        .filter(field -> 
!TransformUtils.isBucketTransform(field.transform()))
+                        .map(
+                                field ->
+                                        partitionType
+                                                .fields()
+                                                
.indexOf(partitionType.field(field.fieldId())))
+                        .collect(Collectors.toList());
+
+        return task ->
+                nonBucketFieldIndices.stream()
+                        .map(index -> task.partition().get(index, 
String.class))
+                        .collect(Collectors.toList());
+    }
+
+    private Catalog createIcebergCatalog(Configuration configuration) {

Review Comment:
   We can extract the code into a common utils since it appears in many places.



##########
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/iceberg/data/IcebergGenericReader.java:
##########
@@ -23,7 +23,7 @@
 
 /**
  * GenericReader to read for records for iceberg. Extends from Iceberg {@link 
GenericReader} to
- * enable the {@link #open(FileScanTask)} method to be visible to Fluss.
+ * enable the {@link FileScanTask} method to be visible to Fluss.

Review Comment:
   why change this ?



##########
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.iceberg.conf.IcebergConfiguration;
+import org.apache.fluss.lake.source.Planner;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.transforms.TransformUtils;
+import org.apache.iceberg.types.Types;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.fluss.lake.iceberg.IcebergLakeCatalog.ICEBERG_CATALOG_DEFAULT_NAME;
+import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
+
+/** Iceberg split planner. */
+public class IcebergSplitPlanner implements Planner<IcebergSplit> {
+
+    private final Configuration icebergConfig;
+    private final TablePath tablePath;
+    private final long snapshotId;
+
+    public IcebergSplitPlanner(Configuration icebergConfig, TablePath 
tablePath, long snapshotId) {
+        this.icebergConfig = icebergConfig;
+        this.tablePath = tablePath;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public List<IcebergSplit> plan() throws IOException {
+        List<IcebergSplit> splits = new ArrayList<>();
+        Catalog catalog = createIcebergCatalog(icebergConfig);
+        Table table = catalog.loadTable(toIceberg(tablePath));
+        Function<FileScanTask, Integer> bucketExtract = 
createBucketExtractor(table);
+        Function<FileScanTask, List<String>> partitionExtract = 
createPartitionExtractor(table);
+        try (CloseableIterable<FileScanTask> tasks =
+                table.newScan()
+                        .useSnapshot(snapshotId)
+                        .includeColumnStats()
+                        .ignoreResiduals()
+                        .planFiles()) {
+            tasks.forEach(
+                    task ->
+                            splits.add(
+                                    new IcebergSplit(
+                                            task,
+                                            bucketExtract.apply(task),
+                                            partitionExtract.apply(task))));
+        }
+        return splits;
+    }
+
+    private Function<FileScanTask, Integer> createBucketExtractor(Table table) 
{
+        PartitionSpec partitionSpec = table.spec();

Review Comment:
   It's more complex to extract bucket and partition......
   For log table:
   - if not bucket key defined in fluss, it'll partition by the __bucket 
column, which we is the bucket. But one good news is that we can just set it to 
-1 since we don't care about the bucket for the split 
   - If is with bucket key, it does partition by the bucket key column.



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.flink;
+
+import org.apache.fluss.config.AutoPartitionTimeUnit;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataTypes;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test union read log table with full type. */
+public class FlinkUnionReadLogTableITCase extends FlinkUnionReadTestBase {
+    @BeforeAll
+    protected static void beforeAll() {
+        FlinkUnionReadTestBase.beforeAll();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testReadLogTableFullType(boolean isPartitioned) throws Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "logTable_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        List<Row> writtenRows = new ArrayList<>();
+        long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
writtenRows);
+        // wait until records has been synced
+        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+        // now, start to read the log table, which will read paimon
+        // may read fluss or not, depends on the log offset of paimon snapshot
+        List<Row> actual =
+                CollectionUtil.iteratorToList(
+                        batchTEnv.executeSql("select * from " + 
tableName).collect());
+
+        assertThat(actual).containsExactlyInAnyOrderElementsOf(writtenRows);
+
+        // can database sync job

Review Comment:
   ```suggestion
           // cancel the tiering job
   ```



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.source.RecordReader;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.BigIntType;
+import org.apache.fluss.types.BinaryType;
+import org.apache.fluss.types.BooleanType;
+import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.DoubleType;
+import org.apache.fluss.types.FloatType;
+import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.types.SmallIntType;
+import org.apache.fluss.types.StringType;
+import org.apache.fluss.types.TimestampType;
+import org.apache.fluss.types.TinyIntType;
+import org.apache.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenericReader;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for {@link IcebergRecordReader}. */
+public class IcebergRecordReaderTest extends IcebergSourceTestBase {

Review Comment:
   nit:
   ```suggestion
   class IcebergRecordReaderTest extends IcebergSourceTestBase {
   ```



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.flink;
+
+import org.apache.fluss.config.AutoPartitionTimeUnit;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataTypes;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test union read log table with full type. */
+public class FlinkUnionReadLogTableITCase extends FlinkUnionReadTestBase {
+    @BeforeAll
+    protected static void beforeAll() {
+        FlinkUnionReadTestBase.beforeAll();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testReadLogTableFullType(boolean isPartitioned) throws Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "logTable_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        List<Row> writtenRows = new ArrayList<>();
+        long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
writtenRows);
+        // wait until records has been synced
+        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+        // now, start to read the log table, which will read paimon

Review Comment:
   ```suggestion
           // now, start to read the log table, which will read iceberg
   ```



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.source.RecordReader;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.BigIntType;
+import org.apache.fluss.types.BinaryType;
+import org.apache.fluss.types.BooleanType;
+import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.DoubleType;
+import org.apache.fluss.types.FloatType;
+import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.types.SmallIntType;
+import org.apache.fluss.types.StringType;
+import org.apache.fluss.types.TimestampType;
+import org.apache.fluss.types.TinyIntType;
+import org.apache.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenericReader;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for {@link IcebergRecordReader}. */
+public class IcebergRecordReaderTest extends IcebergSourceTestBase {
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testReadLogTable(boolean isPartitioned) throws Exception {
+        // prepare iceberg table
+        TablePath tablePath =
+                TablePath.of(
+                        DEFAULT_DB, isPartitioned ? DEFAULT_TABLE + 
"_partitioned" : DEFAULT_TABLE);
+        createFullTypeLogTable(tablePath, isPartitioned, DEFAULT_BUCKET_NUM);
+
+        InternalRow.FieldGetter[] fieldGetters =
+                InternalRow.createFieldGetters(
+                        RowType.of(
+                                new BigIntType(),
+                                new StringType(),
+                                new IntType(),
+                                new DoubleType(),
+                                new BooleanType(),
+                                new TinyIntType(),
+                                new SmallIntType(),
+                                new FloatType(),
+                                new DecimalType(10, 2),
+                                new TimestampType(),
+                                new LocalZonedTimestampType(),
+                                new BinaryType(),
+                                new CharType(),
+                                new StringType()));
+
+        // write data
+        Table table = getTable(tablePath);
+        List<InternalRow> writtenRows = new ArrayList<>();
+        writtenRows.addAll(writeFullTypeRows(table, 10, isPartitioned ? "p1" : 
null));
+        writtenRows.addAll(writeFullTypeRows(table, 20, isPartitioned ? "p2" : 
null));
+
+        // refresh table
+        table.refresh();
+        Snapshot snapshot = table.currentSnapshot();
+
+        List<Row> actual = new ArrayList<>();
+
+        LakeSource<IcebergSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
+        List<IcebergSplit> icebergSplits = 
lakeSource.createPlanner(snapshot::snapshotId).plan();
+        for (IcebergSplit icebergSplit : icebergSplits) {
+            RecordReader recordReader = lakeSource.createRecordReader(() -> 
icebergSplit);
+            CloseableIterator<LogRecord> iterator = recordReader.read();
+            actual.addAll(
+                    convertToFlinkRow(
+                            fieldGetters,
+                            TransformingCloseableIterator.transform(iterator, 
LogRecord::getRow)));
+            iterator.close();
+        }
+        List<Row> expectRows =
+                convertToFlinkRow(fieldGetters, 
CloseableIterator.wrap(writtenRows.iterator()));
+        assertThat(actual).containsExactlyInAnyOrderElementsOf(expectRows);
+    }
+
+    @Test
+    void testReadLogTableWithProject() throws Exception {

Review Comment:
   nit:
   Is it possible to merge testReadLogTableWithProject into `testReadLogTable`?
   So that we can use the preparing records logic.  



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.flink;
+
+import org.apache.fluss.config.AutoPartitionTimeUnit;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataTypes;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test union read log table with full type. */
+public class FlinkUnionReadLogTableITCase extends FlinkUnionReadTestBase {
+    @BeforeAll
+    protected static void beforeAll() {
+        FlinkUnionReadTestBase.beforeAll();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testReadLogTableFullType(boolean isPartitioned) throws Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "logTable_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        List<Row> writtenRows = new ArrayList<>();
+        long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
writtenRows);
+        // wait until records has been synced
+        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+        // now, start to read the log table, which will read paimon
+        // may read fluss or not, depends on the log offset of paimon snapshot
+        List<Row> actual =
+                CollectionUtil.iteratorToList(
+                        batchTEnv.executeSql("select * from " + 
tableName).collect());
+
+        assertThat(actual).containsExactlyInAnyOrderElementsOf(writtenRows);
+
+        // can database sync job
+        jobClient.cancel().get();
+
+        // write some log data again
+        writtenRows.addAll(writeRows(t1, 3, isPartitioned));
+
+        // query the log table again and check the data
+        // it should read both paimon snapshot and fluss log
+        actual =
+                CollectionUtil.iteratorToList(
+                        batchTEnv.executeSql("select * from " + 
tableName).collect());
+        assertThat(actual).containsExactlyInAnyOrderElementsOf(writtenRows);
+
+        // test project push down
+        actual =
+                CollectionUtil.iteratorToList(
+                        batchTEnv.executeSql("select f_byte from " + 
tableName).collect());
+        List<Row> expected =
+                writtenRows.stream()
+                        .map(row -> Row.of(row.getField(1)))
+                        .collect(Collectors.toList());
+        assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+
+        if (isPartitioned) {
+            // get first partition
+            String partition = 
waitUntilPartitions(t1).values().iterator().next();
+            String sqlWithPartitionFilter =
+                    "select * FROM " + tableName + " WHERE p = '" + partition 
+ "'";
+
+            String plan = batchTEnv.explainSql(sqlWithPartitionFilter);
+
+            // check if the plan contains partition filter

Review Comment:
   so, after filter is push to iceberg, the plan won't contains partition 
filter? Right?
   Left a todo to mark it?



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlannerTest.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for {@link IcebergSplitPlanner}. */
+public class IcebergSplitPlannerTest extends IcebergSourceTestBase {

Review Comment:
   Thanks for taking this as consideration



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlannerTest.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for {@link IcebergSplitPlanner}. */
+public class IcebergSplitPlannerTest extends IcebergSourceTestBase {

Review Comment:
   ```suggestion
   class IcebergSplitPlannerTest extends IcebergSourceTestBase {
   ```



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlannerTest.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for {@link IcebergSplitPlanner}. */
+public class IcebergSplitPlannerTest extends IcebergSourceTestBase {
+    @Test
+    void testTablePlan() throws Exception {
+        // prepare iceberg table
+        TablePath tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE);
+        Schema schema =
+                new Schema(
+                        optional(1, "c1", Types.IntegerType.get()),
+                        optional(2, "c2", Types.StringType.get()),
+                        optional(3, "c3", Types.StringType.get()));
+        PartitionSpec partitionSpec = 
PartitionSpec.builderFor(schema).bucket("c1", 2).build();
+        createTable(tablePath, schema, partitionSpec);
+
+        // write data
+        Table table = getTable(tablePath);
+        GenericRecord record1 = GenericRecord.create(table.schema());
+        record1.set(0, 12);
+        record1.set(1, "a");
+        record1.set(2, "A");
+        GenericRecord record2 = GenericRecord.create(table.schema());
+        record2.set(0, 13);
+        record2.set(1, "b");
+        record2.set(2, "B");
+
+        writeRecord(table, Collections.singletonList(record1), null, 0);
+        writeRecord(table, Collections.singletonList(record2), null, 1);
+
+        // refresh table
+        table.refresh();
+        Snapshot snapshot = table.currentSnapshot();
+
+        LakeSource<IcebergSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
+        List<IcebergSplit> icebergSplits = 
lakeSource.createPlanner(snapshot::snapshotId).plan();
+        assertThat(icebergSplits.size()).isEqualTo(2);
+        assertThat(icebergSplits.stream().map(IcebergSplit::bucket))
+                .containsExactlyInAnyOrder(0, 1);
+    }
+
+    @Test
+    void testPartitionTablePlan() throws Exception {
+        // prepare iceberg table
+        TablePath tablePath = TablePath.of(DEFAULT_DB, "partition_" + 
DEFAULT_TABLE);
+        Schema schema =
+                new Schema(
+                        optional(1, "c1", Types.IntegerType.get()),
+                        optional(2, "c2", Types.StringType.get()),
+                        optional(3, "c3", Types.StringType.get()));
+        PartitionSpec partitionSpec =
+                PartitionSpec.builderFor(schema).identity("c2").bucket("c1", 
2).build();
+        createTable(tablePath, schema, partitionSpec);
+
+        // write data
+        Table table = getTable(tablePath);
+        GenericRecord record1 = GenericRecord.create(table.schema());
+        record1.set(0, 12);
+        record1.set(1, "a");
+        record1.set(2, "A");
+        GenericRecord record2 = GenericRecord.create(table.schema());
+        record2.set(0, 13);
+        record2.set(1, "b");
+        record2.set(2, "B");
+
+        writeRecord(table, Collections.singletonList(record1), "a", 0);
+        writeRecord(table, Collections.singletonList(record2), "b", 1);
+
+        // refresh table
+        table.refresh();
+        Snapshot snapshot = table.currentSnapshot();
+
+        LakeSource<IcebergSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
+        List<IcebergSplit> icebergSplits = 
lakeSource.createPlanner(snapshot::snapshotId).plan();
+        assertThat(icebergSplits.size()).isEqualTo(2);
+        assertThat(icebergSplits.stream().map(IcebergSplit::bucket))

Review Comment:
   Can you please check the bucket and partition for icebergSplits[0] and 
icebergSplits[1] one by one? It'll be more clear.



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlannerTest.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for {@link IcebergSplitPlanner}. */
+public class IcebergSplitPlannerTest extends IcebergSourceTestBase {
+    @Test
+    void testTablePlan() throws Exception {
+        // prepare iceberg table
+        TablePath tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE);
+        Schema schema =
+                new Schema(
+                        optional(1, "c1", Types.IntegerType.get()),
+                        optional(2, "c2", Types.StringType.get()),
+                        optional(3, "c3", Types.StringType.get()));
+        PartitionSpec partitionSpec = 
PartitionSpec.builderFor(schema).bucket("c1", 2).build();
+        createTable(tablePath, schema, partitionSpec);
+
+        // write data
+        Table table = getTable(tablePath);
+        GenericRecord record1 = GenericRecord.create(table.schema());
+        record1.set(0, 12);

Review Comment:
   nit:
   add a method 
   ```
   private void setRecordValues(GenericRecord record, Object... values) {
           for (int i = 0; i < values.length; i++) {
               record.set(i, values[i]);
           }
       }
   ```
   then use 
   ```
   setRecordValues(record1, 12, "a", "A");
   ```



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlannerTest.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for {@link IcebergSplitPlanner}. */
+public class IcebergSplitPlannerTest extends IcebergSourceTestBase {
+    @Test
+    void testTablePlan() throws Exception {
+        // prepare iceberg table
+        TablePath tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE);
+        Schema schema =
+                new Schema(
+                        optional(1, "c1", Types.IntegerType.get()),
+                        optional(2, "c2", Types.StringType.get()),
+                        optional(3, "c3", Types.StringType.get()));
+        PartitionSpec partitionSpec = 
PartitionSpec.builderFor(schema).bucket("c1", 2).build();

Review Comment:
   We should also cover when iceberg table partition spec is `__bucket` column 
which is mapped by fluss log table without bucket key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to