luoyuxia commented on code in PR #1527:
URL: https://github.com/apache/fluss/pull/1527#discussion_r2271918188


##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.

Review Comment:
   Now, you need to use apache License
   See https://fluss.apache.org/community/dev/ide-setup/#copyright-profile



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java:
##########
@@ -32,6 +34,9 @@ public static SourceSplitState 
initializedState(SourceSplitBase split) {
             return new PaimonSnapshotSplitState((PaimonSnapshotSplit) split);

Review Comment:
   We can also remove
   ```
   if (split instanceof PaimonSnapshotSplit) {
   return new PaimonSnapshotSplitState((PaimonSnapshotSplit) split);
   }
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/KeyValueRow.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.reader;
+
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.ProjectedRow;
+
+/** An {@link InternalRow} with the key part. */
+public class KeyValueRow {

Review Comment:
   Remove this in this pr



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/SortMergeReader.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.reader;
+
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.ProjectedRow;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+
+/** . */
+public class SortMergeReader {

Review Comment:
   Remove this in this pr



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java:
##########
@@ -46,28 +49,31 @@
 public class LakeSplitReaderGenerator {
 
     private final Table table;
-    private final Connection connection;
 
     private final TablePath tablePath;
     private FileStoreTable fileStoreTable;
     private final @Nullable int[] projectedFields;
+    private final @Nullable LakeSource<LakeSplit> lakeSource;
 
     public LakeSplitReaderGenerator(
             Table table,
-            Connection connection,
             TablePath tablePath,
-            @Nullable int[] projectedFields) {
+            @Nullable int[] projectedFields,
+            @Nullable LakeSource<LakeSplit> lakeSource) {
         this.table = table;
-        this.connection = connection;
         this.tablePath = tablePath;
         this.projectedFields = projectedFields;
+        this.lakeSource = lakeSource;
     }
 
     public void addSplit(SourceSplitBase split, Queue<SourceSplitBase> 
boundedSplits) {
         if (split instanceof PaimonSnapshotSplit) {

Review Comment:
   I think we can remove this now



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotScanner.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.reader;
+
+import com.alibaba.fluss.client.table.scanner.batch.BatchScanner;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+
+/** . */

Review Comment:
   Please add comments for this class.



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -485,10 +516,13 @@ private List<SourceSplitBase> getLakeSplit() throws 
Exception {
                 new LakeSplitGenerator(
                         tableInfo,
                         flussAdmin,
+                        lakeSource,
                         bucketOffsetsRetriever,
                         stoppingOffsetsInitializer,
                         tableInfo.getNumBuckets());
-        return lakeSplitGenerator.generateLakeSplits();
+        List<SourceSplitBase> lakeSplits = 
lakeSplitGenerator.generateHybridLakeSplits();
+        System.out.println(lakeSplits);

Review Comment:
   remove this.



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java:
##########
@@ -70,6 +72,8 @@ public class FlinkSource<OUT>
 
     private final List<FieldEqual> partitionFilters;
 
+    final @Nullable LakeSource<LakeSplit> lakeSource;

Review Comment:
   nit:
   ```suggestion
       private final @Nullable LakeSource<LakeSplit> lakeSource;
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java:
##########
@@ -130,6 +133,10 @@ public class FlinkTableSource
 
     private List<FieldEqual> partitionFilters = Collections.emptyList();
 
+    private final Map<String, String> tableOptions;
+
+    @Nullable private LakeSource<LakeSplit> lakeSource;

Review Comment:
   nit:
   ```suggestion
       @Nullable private final LakeSource<LakeSplit> lakeSource;
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake;
+
+import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.client.metadata.LakeSnapshot;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
+import 
com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
+import com.alibaba.fluss.flink.source.split.LogSplit;
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.PartitionInfo;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableInfo;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.InnerTableScan;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
+import static 
com.alibaba.fluss.flink.utils.DataLakeUtils.extractLakeCatalogProperties;
+import static com.alibaba.fluss.utils.Preconditions.checkState;
+
+/** A generator for lake splits. */
+public class LakeSplitGenerator {
+
+    private final TableInfo tableInfo;
+    private final Admin flussAdmin;
+    private final OffsetsInitializer.BucketOffsetsRetriever 
bucketOffsetsRetriever;
+    private final OffsetsInitializer stoppingOffsetInitializer;
+    private final int bucketCount;
+
+    private final LakeSource<LakeSplit> lakeSource;
+
+    public LakeSplitGenerator(
+            TableInfo tableInfo,
+            Admin flussAdmin,
+            LakeSource<LakeSplit> lakeSource,
+            OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever,
+            OffsetsInitializer stoppingOffsetInitializer,
+            int bucketCount) {
+        this.tableInfo = tableInfo;
+        this.flussAdmin = flussAdmin;
+        this.lakeSource = lakeSource;
+        this.bucketOffsetsRetriever = bucketOffsetsRetriever;
+        this.stoppingOffsetInitializer = stoppingOffsetInitializer;
+        this.bucketCount = bucketCount;
+    }
+
+    public List<SourceSplitBase> generateHybridLakeSplits() throws Exception {
+        // get the file store
+        LakeSnapshot lakeSnapshotInfo =
+                
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
+        FileStoreTable fileStoreTable =
+                getTable(
+                        lakeSnapshotInfo.getSnapshotId(),
+                        
extractLakeCatalogProperties(tableInfo.getProperties()));
+
+        boolean isLogTable = !tableInfo.hasPrimaryKey();
+        boolean isPartitioned = tableInfo.isPartitioned();
+
+        Map<String, Map<Integer, List<LakeSplit>>> lakeSplits =
+                groupLakeSplits(
+                        lakeSource
+                                .createPlanner(
+                                        (LakeSource.PlannerContext) 
lakeSnapshotInfo::getSnapshotId)
+                                .plan());
+        if (isPartitioned) {
+            List<PartitionInfo> partitionInfos =
+                    
flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get();
+            Map<Long, String> partitionNameById =
+                    partitionInfos.stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            PartitionInfo::getPartitionId,
+                                            PartitionInfo::getPartitionName));
+            return generatePartitionTableSplit(
+                    lakeSplits,
+                    isLogTable,
+                    lakeSnapshotInfo.getTableBucketsOffset(),
+                    partitionNameById,
+                    fileStoreTable);
+        } else {
+            Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
+                    lakeSplits.values().iterator().next();
+            // non-partitioned table
+            return generateNoPartitionedTableSplit(
+                    nonPartitionLakeSplits,
+                    isLogTable,
+                    lakeSnapshotInfo.getTableBucketsOffset(),
+                    fileStoreTable);
+        }
+    }
+
+    private Map<String, Map<Integer, List<LakeSplit>>> 
groupLakeSplits(List<LakeSplit> lakeSplits) {
+        Map<String, Map<Integer, List<LakeSplit>>> result = new HashMap<>();
+        for (LakeSplit split : lakeSplits) {
+            String partition = String.join("$", split.partition());
+            int bucket = split.bucket();
+            // Get or create the partition group
+            Map<Integer, List<LakeSplit>> bucketMap =
+                    result.computeIfAbsent(partition, k -> new HashMap<>());
+            List<LakeSplit> splitList = bucketMap.computeIfAbsent(bucket, k -> 
new ArrayList<>());
+            splitList.add(split);
+        }
+        return result;
+    }
+
+    private List<SourceSplitBase> generatePartitionTableSplit(
+            Map<String, Map<Integer, List<LakeSplit>>> lakeSplits,
+            boolean isLogTable,
+            Map<TableBucket, Long> tableBucketSnapshotLogOffset,
+            Map<Long, String> partitionNameById,
+            @Nullable FileStoreTable fileStoreTable)
+            throws Exception {
+        List<SourceSplitBase> splits = new ArrayList<>();
+        for (Map.Entry<Long, String> partitionNameByIdEntry : 
partitionNameById.entrySet()) {
+            long partitionId = partitionNameByIdEntry.getKey();
+            String partitionName = partitionNameByIdEntry.getValue();
+            Map<Integer, Long> bucketEndOffset =
+                    stoppingOffsetInitializer.getBucketOffsets(
+                            partitionName,
+                            IntStream.range(0, 
bucketCount).boxed().collect(Collectors.toList()),
+                            bucketOffsetsRetriever);
+            splits.addAll(
+                    generateSplit(
+                            lakeSplits.get(partitionName),
+                            partitionId,
+                            partitionName,
+                            isLogTable,
+                            tableBucketSnapshotLogOffset,
+                            bucketEndOffset,
+                            fileStoreTable));
+        }
+        return splits;
+    }
+
+    private List<SourceSplitBase> generateSplit(
+            @Nullable Map<Integer, List<LakeSplit>> lakeSplits,
+            @Nullable Long partitionId,
+            @Nullable String partitionName,
+            boolean isLogTable,
+            Map<TableBucket, Long> tableBucketSnapshotLogOffset,
+            Map<Integer, Long> bucketEndOffset,
+            @Nullable FileStoreTable fileStoreTable) {
+        List<SourceSplitBase> splits = new ArrayList<>();
+        if (isLogTable) {
+            // it's log table, we don't care about bucket, and we can't get 
bucket in paimon's

Review Comment:
   the comments are out of date, remove this 
   



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake;
+
+import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.client.metadata.LakeSnapshot;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
+import 
com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
+import com.alibaba.fluss.flink.source.split.LogSplit;
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.PartitionInfo;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableInfo;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.InnerTableScan;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
+import static 
com.alibaba.fluss.flink.utils.DataLakeUtils.extractLakeCatalogProperties;
+import static com.alibaba.fluss.utils.Preconditions.checkState;
+
+/** A generator for lake splits. */
+public class LakeSplitGenerator {
+
+    private final TableInfo tableInfo;
+    private final Admin flussAdmin;
+    private final OffsetsInitializer.BucketOffsetsRetriever 
bucketOffsetsRetriever;
+    private final OffsetsInitializer stoppingOffsetInitializer;
+    private final int bucketCount;
+
+    private final LakeSource<LakeSplit> lakeSource;
+
+    public LakeSplitGenerator(
+            TableInfo tableInfo,
+            Admin flussAdmin,
+            LakeSource<LakeSplit> lakeSource,
+            OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever,
+            OffsetsInitializer stoppingOffsetInitializer,
+            int bucketCount) {
+        this.tableInfo = tableInfo;
+        this.flussAdmin = flussAdmin;
+        this.lakeSource = lakeSource;
+        this.bucketOffsetsRetriever = bucketOffsetsRetriever;
+        this.stoppingOffsetInitializer = stoppingOffsetInitializer;
+        this.bucketCount = bucketCount;
+    }
+
+    public List<SourceSplitBase> generateHybridLakeSplits() throws Exception {
+        // get the file store
+        LakeSnapshot lakeSnapshotInfo =
+                
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
+        FileStoreTable fileStoreTable =
+                getTable(
+                        lakeSnapshotInfo.getSnapshotId(),
+                        
extractLakeCatalogProperties(tableInfo.getProperties()));
+
+        boolean isLogTable = !tableInfo.hasPrimaryKey();
+        boolean isPartitioned = tableInfo.isPartitioned();
+
+        Map<String, Map<Integer, List<LakeSplit>>> lakeSplits =
+                groupLakeSplits(
+                        lakeSource
+                                .createPlanner(
+                                        (LakeSource.PlannerContext) 
lakeSnapshotInfo::getSnapshotId)
+                                .plan());
+        if (isPartitioned) {
+            List<PartitionInfo> partitionInfos =
+                    
flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get();
+            Map<Long, String> partitionNameById =
+                    partitionInfos.stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            PartitionInfo::getPartitionId,
+                                            PartitionInfo::getPartitionName));
+            return generatePartitionTableSplit(
+                    lakeSplits,
+                    isLogTable,
+                    lakeSnapshotInfo.getTableBucketsOffset(),
+                    partitionNameById,
+                    fileStoreTable);
+        } else {
+            Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
+                    lakeSplits.values().iterator().next();
+            // non-partitioned table
+            return generateNoPartitionedTableSplit(
+                    nonPartitionLakeSplits,
+                    isLogTable,
+                    lakeSnapshotInfo.getTableBucketsOffset(),
+                    fileStoreTable);
+        }
+    }
+
+    private Map<String, Map<Integer, List<LakeSplit>>> 
groupLakeSplits(List<LakeSplit> lakeSplits) {
+        Map<String, Map<Integer, List<LakeSplit>>> result = new HashMap<>();
+        for (LakeSplit split : lakeSplits) {
+            String partition = String.join("$", split.partition());
+            int bucket = split.bucket();
+            // Get or create the partition group
+            Map<Integer, List<LakeSplit>> bucketMap =
+                    result.computeIfAbsent(partition, k -> new HashMap<>());
+            List<LakeSplit> splitList = bucketMap.computeIfAbsent(bucket, k -> 
new ArrayList<>());
+            splitList.add(split);
+        }
+        return result;
+    }
+
+    private List<SourceSplitBase> generatePartitionTableSplit(
+            Map<String, Map<Integer, List<LakeSplit>>> lakeSplits,
+            boolean isLogTable,
+            Map<TableBucket, Long> tableBucketSnapshotLogOffset,
+            Map<Long, String> partitionNameById,
+            @Nullable FileStoreTable fileStoreTable)
+            throws Exception {
+        List<SourceSplitBase> splits = new ArrayList<>();
+        for (Map.Entry<Long, String> partitionNameByIdEntry : 
partitionNameById.entrySet()) {
+            long partitionId = partitionNameByIdEntry.getKey();
+            String partitionName = partitionNameByIdEntry.getValue();
+            Map<Integer, Long> bucketEndOffset =
+                    stoppingOffsetInitializer.getBucketOffsets(
+                            partitionName,
+                            IntStream.range(0, 
bucketCount).boxed().collect(Collectors.toList()),
+                            bucketOffsetsRetriever);
+            splits.addAll(
+                    generateSplit(
+                            lakeSplits.get(partitionName),
+                            partitionId,
+                            partitionName,
+                            isLogTable,
+                            tableBucketSnapshotLogOffset,
+                            bucketEndOffset,
+                            fileStoreTable));
+        }
+        return splits;
+    }
+
+    private List<SourceSplitBase> generateSplit(
+            @Nullable Map<Integer, List<LakeSplit>> lakeSplits,
+            @Nullable Long partitionId,
+            @Nullable String partitionName,
+            boolean isLogTable,
+            Map<TableBucket, Long> tableBucketSnapshotLogOffset,
+            Map<Integer, Long> bucketEndOffset,
+            @Nullable FileStoreTable fileStoreTable) {
+        List<SourceSplitBase> splits = new ArrayList<>();
+        if (isLogTable) {
+            // it's log table, we don't care about bucket, and we can't get 
bucket in paimon's
+            // dynamic bucket; so first generate split for the whole paimon 
snapshot,
+            // then generate log split for each bucket paimon snapshot + fluss 
log
+            splits.addAll(toLakeSnapshotSplits(lakeSplits, partitionName, 
partitionId));

Review Comment:
   Argument 'lakeSplits' might be null, skip to lake snapshot splits when 
`lakeSplits` is null?



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/emitter/FlinkRecordEmitter.java:
##########
@@ -95,6 +95,8 @@ record = deserializationSchema.deserialize(scanRecord);
                     e);
         }
 
+        System.out.println("record: " + record);

Review Comment:
   remove this



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake;
+
+import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.client.metadata.LakeSnapshot;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
+import 
com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
+import com.alibaba.fluss.flink.source.split.LogSplit;
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.PartitionInfo;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableInfo;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.InnerTableScan;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
+import static 
com.alibaba.fluss.flink.utils.DataLakeUtils.extractLakeCatalogProperties;
+import static com.alibaba.fluss.utils.Preconditions.checkState;
+
+/** A generator for lake splits. */
+public class LakeSplitGenerator {
+
+    private final TableInfo tableInfo;
+    private final Admin flussAdmin;
+    private final OffsetsInitializer.BucketOffsetsRetriever 
bucketOffsetsRetriever;
+    private final OffsetsInitializer stoppingOffsetInitializer;
+    private final int bucketCount;
+
+    private final LakeSource<LakeSplit> lakeSource;
+
+    public LakeSplitGenerator(
+            TableInfo tableInfo,
+            Admin flussAdmin,
+            LakeSource<LakeSplit> lakeSource,
+            OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever,
+            OffsetsInitializer stoppingOffsetInitializer,
+            int bucketCount) {
+        this.tableInfo = tableInfo;
+        this.flussAdmin = flussAdmin;
+        this.lakeSource = lakeSource;
+        this.bucketOffsetsRetriever = bucketOffsetsRetriever;
+        this.stoppingOffsetInitializer = stoppingOffsetInitializer;
+        this.bucketCount = bucketCount;
+    }
+
+    public List<SourceSplitBase> generateHybridLakeSplits() throws Exception {
+        // get the file store
+        LakeSnapshot lakeSnapshotInfo =
+                
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
+        FileStoreTable fileStoreTable =
+                getTable(
+                        lakeSnapshotInfo.getSnapshotId(),
+                        
extractLakeCatalogProperties(tableInfo.getProperties()));
+
+        boolean isLogTable = !tableInfo.hasPrimaryKey();
+        boolean isPartitioned = tableInfo.isPartitioned();
+
+        Map<String, Map<Integer, List<LakeSplit>>> lakeSplits =
+                groupLakeSplits(
+                        lakeSource
+                                .createPlanner(
+                                        (LakeSource.PlannerContext) 
lakeSnapshotInfo::getSnapshotId)
+                                .plan());
+        if (isPartitioned) {
+            List<PartitionInfo> partitionInfos =
+                    
flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get();
+            Map<Long, String> partitionNameById =
+                    partitionInfos.stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            PartitionInfo::getPartitionId,
+                                            PartitionInfo::getPartitionName));
+            return generatePartitionTableSplit(
+                    lakeSplits,
+                    isLogTable,
+                    lakeSnapshotInfo.getTableBucketsOffset(),
+                    partitionNameById,
+                    fileStoreTable);
+        } else {
+            Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
+                    lakeSplits.values().iterator().next();
+            // non-partitioned table
+            return generateNoPartitionedTableSplit(
+                    nonPartitionLakeSplits,
+                    isLogTable,
+                    lakeSnapshotInfo.getTableBucketsOffset(),
+                    fileStoreTable);
+        }
+    }
+
+    private Map<String, Map<Integer, List<LakeSplit>>> 
groupLakeSplits(List<LakeSplit> lakeSplits) {
+        Map<String, Map<Integer, List<LakeSplit>>> result = new HashMap<>();
+        for (LakeSplit split : lakeSplits) {
+            String partition = String.join("$", split.partition());
+            int bucket = split.bucket();
+            // Get or create the partition group
+            Map<Integer, List<LakeSplit>> bucketMap =
+                    result.computeIfAbsent(partition, k -> new HashMap<>());
+            List<LakeSplit> splitList = bucketMap.computeIfAbsent(bucket, k -> 
new ArrayList<>());
+            splitList.add(split);
+        }
+        return result;
+    }
+
+    private List<SourceSplitBase> generatePartitionTableSplit(
+            Map<String, Map<Integer, List<LakeSplit>>> lakeSplits,
+            boolean isLogTable,
+            Map<TableBucket, Long> tableBucketSnapshotLogOffset,
+            Map<Long, String> partitionNameById,
+            @Nullable FileStoreTable fileStoreTable)
+            throws Exception {
+        List<SourceSplitBase> splits = new ArrayList<>();
+        for (Map.Entry<Long, String> partitionNameByIdEntry : 
partitionNameById.entrySet()) {
+            long partitionId = partitionNameByIdEntry.getKey();
+            String partitionName = partitionNameByIdEntry.getValue();
+            Map<Integer, Long> bucketEndOffset =
+                    stoppingOffsetInitializer.getBucketOffsets(
+                            partitionName,
+                            IntStream.range(0, 
bucketCount).boxed().collect(Collectors.toList()),
+                            bucketOffsetsRetriever);
+            splits.addAll(
+                    generateSplit(
+                            lakeSplits.get(partitionName),
+                            partitionId,
+                            partitionName,
+                            isLogTable,
+                            tableBucketSnapshotLogOffset,
+                            bucketEndOffset,
+                            fileStoreTable));
+        }
+        return splits;
+    }
+
+    private List<SourceSplitBase> generateSplit(
+            @Nullable Map<Integer, List<LakeSplit>> lakeSplits,
+            @Nullable Long partitionId,
+            @Nullable String partitionName,
+            boolean isLogTable,
+            Map<TableBucket, Long> tableBucketSnapshotLogOffset,
+            Map<Integer, Long> bucketEndOffset,
+            @Nullable FileStoreTable fileStoreTable) {
+        List<SourceSplitBase> splits = new ArrayList<>();
+        if (isLogTable) {
+            // it's log table, we don't care about bucket, and we can't get 
bucket in paimon's
+            // dynamic bucket; so first generate split for the whole paimon 
snapshot,
+            // then generate log split for each bucket paimon snapshot + fluss 
log
+            splits.addAll(toLakeSnapshotSplits(lakeSplits, partitionName, 
partitionId));
+            for (int bucket = 0; bucket < bucketCount; bucket++) {
+                TableBucket tableBucket =
+                        new TableBucket(tableInfo.getTableId(), partitionId, 
bucket);
+                Long snapshotLogOffset = 
tableBucketSnapshotLogOffset.get(tableBucket);
+                long stoppingOffset = bucketEndOffset.get(bucket);
+                if (snapshotLogOffset == null) {
+                    // no any data commit to this bucket, scan from fluss log
+                    splits.add(
+                            new LogSplit(
+                                    tableBucket, partitionName, 
EARLIEST_OFFSET, stoppingOffset));
+                } else {
+                    // need to read remain fluss log
+                    if (snapshotLogOffset < stoppingOffset) {
+                        splits.add(
+                                new LogSplit(
+                                        tableBucket,
+                                        partitionName,
+                                        snapshotLogOffset,
+                                        stoppingOffset));
+                    }
+                }
+            }
+        } else {
+            // it's primary key table
+            for (int bucket = 0; bucket < bucketCount; bucket++) {
+                TableBucket tableBucket =
+                        new TableBucket(tableInfo.getTableId(), partitionId, 
bucket);
+                Long snapshotLogOffset = 
tableBucketSnapshotLogOffset.get(tableBucket);
+                long stoppingOffset = bucketEndOffset.get(bucket);
+                FileStoreSourceSplitGenerator splitGenerator = new 
FileStoreSourceSplitGenerator();
+
+                splits.add(
+                        generateSplitForPrimaryKeyTableBucket(
+                                fileStoreTable,
+                                splitGenerator,
+                                tableBucket,
+                                partitionName,
+                                snapshotLogOffset,
+                                stoppingOffset));
+            }
+        }
+
+        return splits;
+    }
+
+    private List<SourceSplitBase> toLakeSnapshotSplits(
+            Map<Integer, List<LakeSplit>> lakeSplits,
+            @Nullable String partitionName,
+            @Nullable Long partitionId) {
+        List<SourceSplitBase> splits = new ArrayList<>();
+        TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), 
partitionId, -1);

Review Comment:
   ```
   List<SourceSplitBase> splits = new ArrayList<>();
           for (LakeSplit lakeSplit :
                   
lakeSplits.values().stream().flatMap(List::stream).collect(Collectors.toList()))
 {
               TableBucket tableBucket =
                       new TableBucket(tableInfo.getTableId(), partitionId, 
lakeSplit.bucket());
               splits.add(new LakeSnapshotSplit(tableBucket, partitionName, 
lakeSplit));
           }
           return splits;
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java:
##########
@@ -382,10 +397,18 @@ public boolean supportsNestedProjection() {
     public void applyProjection(int[][] projectedFields, DataType 
producedDataType) {
         this.projectedFields = Arrays.stream(projectedFields).mapToInt(value 
-> value[0]).toArray();
         this.producedDataType = producedDataType.getLogicalType();
+        if (lakeSource != null) {
+            lakeSource.withProject(projectedFields);
+        }
     }
 
     @Override
     public Result applyFilters(List<ResolvedExpression> filters) {
+        if (lakeSource != null) {
+            // todo: use real filters
+            lakeSource.withFilters(Collections.emptyList());

Review Comment:
   But partition filter push down is a must in 
https://github.com/apache/fluss/pull/1527/files#diff-ebec624f8a39fadb5f5ab74ac8b8c32a2ae3cb3d497841807595b27bb4aa57d3R457
   I create a issue #1141 to track it. You just need to left a todo for #1141 
in 
https://github.com/apache/fluss/pull/1527/files#diff-ebec624f8a39fadb5f5ab74ac8b8c32a2ae3cb3d497841807595b27bb4aa57d3R457
 for this pr



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake;
+
+import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.client.metadata.LakeSnapshot;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
+import 
com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
+import com.alibaba.fluss.flink.source.split.LogSplit;
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.PartitionInfo;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableInfo;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.InnerTableScan;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
+import static 
com.alibaba.fluss.flink.utils.DataLakeUtils.extractLakeCatalogProperties;
+import static com.alibaba.fluss.utils.Preconditions.checkState;
+
+/** A generator for lake splits. */
+public class LakeSplitGenerator {
+
+    private final TableInfo tableInfo;
+    private final Admin flussAdmin;
+    private final OffsetsInitializer.BucketOffsetsRetriever 
bucketOffsetsRetriever;
+    private final OffsetsInitializer stoppingOffsetInitializer;
+    private final int bucketCount;
+
+    private final LakeSource<LakeSplit> lakeSource;
+
+    public LakeSplitGenerator(
+            TableInfo tableInfo,
+            Admin flussAdmin,
+            LakeSource<LakeSplit> lakeSource,
+            OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever,
+            OffsetsInitializer stoppingOffsetInitializer,
+            int bucketCount) {
+        this.tableInfo = tableInfo;
+        this.flussAdmin = flussAdmin;
+        this.lakeSource = lakeSource;
+        this.bucketOffsetsRetriever = bucketOffsetsRetriever;
+        this.stoppingOffsetInitializer = stoppingOffsetInitializer;
+        this.bucketCount = bucketCount;
+    }
+
+    public List<SourceSplitBase> generateHybridLakeSplits() throws Exception {
+        // get the file store
+        LakeSnapshot lakeSnapshotInfo =
+                
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
+        FileStoreTable fileStoreTable =
+                getTable(
+                        lakeSnapshotInfo.getSnapshotId(),
+                        
extractLakeCatalogProperties(tableInfo.getProperties()));
+
+        boolean isLogTable = !tableInfo.hasPrimaryKey();
+        boolean isPartitioned = tableInfo.isPartitioned();
+
+        Map<String, Map<Integer, List<LakeSplit>>> lakeSplits =
+                groupLakeSplits(
+                        lakeSource
+                                .createPlanner(
+                                        (LakeSource.PlannerContext) 
lakeSnapshotInfo::getSnapshotId)
+                                .plan());
+        if (isPartitioned) {
+            List<PartitionInfo> partitionInfos =
+                    
flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get();
+            Map<Long, String> partitionNameById =
+                    partitionInfos.stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            PartitionInfo::getPartitionId,
+                                            PartitionInfo::getPartitionName));
+            return generatePartitionTableSplit(
+                    lakeSplits,
+                    isLogTable,
+                    lakeSnapshotInfo.getTableBucketsOffset(),
+                    partitionNameById,
+                    fileStoreTable);
+        } else {
+            Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
+                    lakeSplits.values().iterator().next();
+            // non-partitioned table
+            return generateNoPartitionedTableSplit(
+                    nonPartitionLakeSplits,
+                    isLogTable,
+                    lakeSnapshotInfo.getTableBucketsOffset(),
+                    fileStoreTable);
+        }
+    }
+
+    private Map<String, Map<Integer, List<LakeSplit>>> 
groupLakeSplits(List<LakeSplit> lakeSplits) {
+        Map<String, Map<Integer, List<LakeSplit>>> result = new HashMap<>();
+        for (LakeSplit split : lakeSplits) {
+            String partition = String.join("$", split.partition());
+            int bucket = split.bucket();
+            // Get or create the partition group
+            Map<Integer, List<LakeSplit>> bucketMap =
+                    result.computeIfAbsent(partition, k -> new HashMap<>());
+            List<LakeSplit> splitList = bucketMap.computeIfAbsent(bucket, k -> 
new ArrayList<>());
+            splitList.add(split);
+        }
+        return result;
+    }
+
+    private List<SourceSplitBase> generatePartitionTableSplit(
+            Map<String, Map<Integer, List<LakeSplit>>> lakeSplits,
+            boolean isLogTable,
+            Map<TableBucket, Long> tableBucketSnapshotLogOffset,
+            Map<Long, String> partitionNameById,
+            @Nullable FileStoreTable fileStoreTable)
+            throws Exception {
+        List<SourceSplitBase> splits = new ArrayList<>();
+        for (Map.Entry<Long, String> partitionNameByIdEntry : 
partitionNameById.entrySet()) {

Review Comment:
   This logic will cause if fluss only have partition p1, p2 due to partition 
ttl, but lake table has partition p0, p1, p2, then p0 will not be read. 
   Suggesst to be 
   ```
   List<SourceSplitBase> splits = new ArrayList<>();
           Map<String, Long> flussPartitionIdByName =
                   flussPartitionNameById.entrySet().stream()
                           .collect(
                                   Collectors.toMap(
                                           Map.Entry::getValue,
                                           Map.Entry::getKey,
                                           (existing, replacement) -> existing,
                                           LinkedHashMap::new));
           long lakeSplitPartitionId = -1L;
   
           // iterate lake splits
           for (Map.Entry<String, Map<Integer, List<LakeSplit>>> lakeSplitEntry 
:
                   lakeSplits.entrySet()) {
               String partitionName = lakeSplitEntry.getKey();
               Map<Integer, List<LakeSplit>> lakeSplitsOfPartition = 
lakeSplitEntry.getValue();
               Long partitionId = flussPartitionIdByName.remove(partitionName);
               if (partitionId != null) {
                   // mean the partition also exist in fluss partition
                   Map<Integer, Long> bucketEndOffset =
                           stoppingOffsetInitializer.getBucketOffsets(
                                   partitionName,
                                   IntStream.range(0, bucketCount)
                                           .boxed()
                                           .collect(Collectors.toList()),
                                   bucketOffsetsRetriever);
                   splits.addAll(
                           generateSplit(
                                   lakeSplitsOfPartition,
                                   partitionId,
                                   partitionName,
                                   isLogTable,
                                   tableBucketSnapshotLogOffset,
                                   bucketEndOffset,
                                   fileStoreTable));
   
               } else {
                   // only lake data
                   splits.addAll(
                           toLakeSnapshotSplits(
                                   lakeSplitsOfPartition,
                                   partitionName,
                                   // now, we can't get partition id for the 
partition only
                                   // in lake, set them to a arbitrary 
partition id, but
                                   // make sure different partition have 
different partition id
                                   // to enable different partition can be 
distributed to different
                                   // tasks
                                   lakeSplitPartitionId--));
               }
           }
   
           // iterate remain fluss splits
           for (Map.Entry<String, Long> partitionIdByNameEntry : 
flussPartitionIdByName.entrySet()) {
               String partitionName = partitionIdByNameEntry.getKey();
               long partitionId = partitionIdByNameEntry.getValue();
               Map<Integer, Long> bucketEndOffset =
                       stoppingOffsetInitializer.getBucketOffsets(
                               partitionName,
                               IntStream.range(0, 
bucketCount).boxed().collect(Collectors.toList()),
                               bucketOffsetsRetriever);
               splits.addAll(
                       generateSplit(
                               null,
                               partitionId,
                               partitionName,
                               isLogTable,
                               // pass empty map since we won't read lake splits
                               Collections.emptyMap(),
                               bucketEndOffset,
                               fileStoreTable));
           }
           return splits;
   ```
   
   
   



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java:
##########
@@ -382,10 +397,18 @@ public boolean supportsNestedProjection() {
     public void applyProjection(int[][] projectedFields, DataType 
producedDataType) {
         this.projectedFields = Arrays.stream(projectedFields).mapToInt(value 
-> value[0]).toArray();
         this.producedDataType = producedDataType.getLogicalType();
+        if (lakeSource != null) {
+            lakeSource.withProject(projectedFields);
+        }
     }
 
     @Override
     public Result applyFilters(List<ResolvedExpression> filters) {
+        if (lakeSource != null) {
+            // todo: use real filters
+            lakeSource.withFilters(Collections.emptyList());

Review Comment:
   let's just do nothing in here, but just left todo for apply filters for 
future works.



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotSplitState.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.state;
+
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.flink.source.split.SourceSplitState;
+
+/** . */

Review Comment:
   Add comments



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializer.java:
##########
@@ -17,19 +17,25 @@
 
 package com.alibaba.fluss.flink.source.split;
 
-import com.alibaba.fluss.flink.lakehouse.LakeSplitSerializer;
+import com.alibaba.fluss.flink.lake.LakeSplitSerializer;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
 import com.alibaba.fluss.metadata.TableBucket;
 
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+
 /** A serializer for the {@link SourceSplitBase}. */
 public class SourceSplitSerializer implements 
SimpleVersionedSerializer<SourceSplitBase> {
 
-    public static final SourceSplitSerializer INSTANCE = new 
SourceSplitSerializer();
+    public static final SourceSplitSerializer INSTANCE = new 
SourceSplitSerializer(null);

Review Comment:
   remove this since it's error-prone and only used in testing code.



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/LakeSourceUtils.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.utils;
+
+import com.alibaba.fluss.config.ConfigOptions;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.lakestorage.LakeStorage;
+import com.alibaba.fluss.lake.lakestorage.LakeStoragePlugin;
+import com.alibaba.fluss.lake.lakestorage.LakeStoragePluginSetUp;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.TablePath;
+
+import java.util.Map;
+
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+
+/** */

Review Comment:
   add comments.



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java:
##########
@@ -46,28 +49,31 @@
 public class LakeSplitReaderGenerator {
 
     private final Table table;
-    private final Connection connection;
 
     private final TablePath tablePath;
     private FileStoreTable fileStoreTable;
     private final @Nullable int[] projectedFields;
+    private final @Nullable LakeSource<LakeSplit> lakeSource;
 
     public LakeSplitReaderGenerator(
             Table table,
-            Connection connection,
             TablePath tablePath,
-            @Nullable int[] projectedFields) {
+            @Nullable int[] projectedFields,
+            @Nullable LakeSource<LakeSplit> lakeSource) {
         this.table = table;
-        this.connection = connection;
         this.tablePath = tablePath;
         this.projectedFields = projectedFields;
+        this.lakeSource = lakeSource;
     }
 
     public void addSplit(SourceSplitBase split, Queue<SourceSplitBase> 
boundedSplits) {
         if (split instanceof PaimonSnapshotSplit) {
             boundedSplits.add(split);
         } else if (split instanceof PaimonSnapshotAndFlussLogSplit) {
             boundedSplits.add(split);
+        } else if (split instanceof LakeSnapshotSplit) {

Review Comment:
   We should support `LakeSnapshotSplit` in this pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to