This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 4cf486d68 [flink] Union read decouple with paimon for log table (#1527)
4cf486d68 is described below

commit 4cf486d6808e0babadade19f40812c2bf15d54a1
Author: CaoZhen <caozhen1...@outlook.com>
AuthorDate: Wed Aug 13 04:50:20 2025 -0700

    [flink] Union read decouple with paimon for log table (#1527)
---
 .../alibaba/fluss/flink/catalog/FlinkCatalog.java  |   2 +-
 .../fluss/flink/catalog/FlinkTableFactory.java     |   5 +-
 .../flink/{lakehouse => lake}/LakeCatalog.java     |   2 +-
 .../LakeRecordRecordEmitter.java                   |   6 +-
 .../fluss/flink/lake/LakeSplitGenerator.java       | 391 +++++++++++++++++++++
 .../LakeSplitReaderGenerator.java                  |  25 +-
 .../fluss/flink/lake/LakeSplitSerializer.java      | 126 +++++++
 .../LakeSplitStateInitializer.java                 |  13 +-
 .../{lakehouse => lake}/LakeTableFactory.java      |   2 +-
 .../flink/lake/reader/LakeSnapshotScanner.java     |  97 +++++
 .../fluss/flink/lake/split/LakeSnapshotSplit.java  |  77 ++++
 .../flink/lake/state/LakeSnapshotSplitState.java   |  48 +++
 .../alibaba/fluss/flink/source/FlinkSource.java    |  44 ++-
 .../fluss/flink/source/FlinkTableSource.java       |  28 +-
 .../flink/source/emitter/FlinkRecordEmitter.java   |   2 +-
 .../source/enumerator/FlinkSourceEnumerator.java   |  41 ++-
 .../flink/source/reader/FlinkSourceReader.java     |  10 +-
 .../source/reader/FlinkSourceSplitReader.java      |  13 +-
 .../flink/source/split/SourceSplitSerializer.java  |  31 +-
 .../fluss/flink/utils/FlinkConversions.java        |  16 +
 .../alibaba/fluss/flink/utils/LakeSourceUtils.java |  52 +++
 .../enumerator/FlinkSourceEnumeratorTest.java      |   3 +-
 .../flink/source/reader/FlinkSourceReaderTest.java |   3 +-
 .../source/reader/FlinkSourceSplitReaderTest.java  |   8 +-
 .../source/split/SourceSplitSerializerTest.java    |   2 +-
 fluss-test-coverage/pom.xml                        |   1 +
 26 files changed, 993 insertions(+), 55 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java
index 6dca5dca5..476b1aabf 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java
@@ -24,7 +24,7 @@ import com.alibaba.fluss.config.ConfigOptions;
 import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.exception.FlussRuntimeException;
 import com.alibaba.fluss.exception.InvalidTableException;
-import com.alibaba.fluss.flink.lakehouse.LakeCatalog;
+import com.alibaba.fluss.flink.lake.LakeCatalog;
 import com.alibaba.fluss.flink.procedure.ProcedureManager;
 import com.alibaba.fluss.flink.utils.CatalogExceptionUtils;
 import com.alibaba.fluss.flink.utils.DataLakeUtils;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java
index e9b5cc313..c943aa5f3 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java
@@ -20,7 +20,7 @@ package com.alibaba.fluss.flink.catalog;
 import com.alibaba.fluss.config.ConfigOptions;
 import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.flink.FlinkConnectorOptions;
-import com.alibaba.fluss.flink.lakehouse.LakeTableFactory;
+import com.alibaba.fluss.flink.lake.LakeTableFactory;
 import com.alibaba.fluss.flink.sink.FlinkTableSink;
 import com.alibaba.fluss.flink.source.FlinkTableSource;
 import com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils;
@@ -143,7 +143,8 @@ public class FlinkTableFactory implements 
DynamicTableSourceFactory, DynamicTabl
                 cache,
                 partitionDiscoveryIntervalMs,
                 
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
-                
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)));
+                
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
+                context.getCatalogTable().getOptions());
     }
 
     @Override
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeCatalog.java
similarity index 98%
rename from 
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeCatalog.java
rename to 
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeCatalog.java
index 3f42b8ff7..8c24e6e3c 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeCatalog.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package com.alibaba.fluss.flink.lakehouse;
+package com.alibaba.fluss.flink.lake;
 
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.ObjectPath;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeRecordRecordEmitter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
similarity index 87%
rename from 
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeRecordRecordEmitter.java
rename to 
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
index 5133ddec4..207422ed8 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeRecordRecordEmitter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package com.alibaba.fluss.flink.lakehouse;
+package com.alibaba.fluss.flink.lake;
 
 import com.alibaba.fluss.client.table.scanner.ScanRecord;
+import com.alibaba.fluss.flink.lake.state.LakeSnapshotSplitState;
 import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplitState;
 import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplitState;
 import com.alibaba.fluss.flink.source.reader.RecordAndPos;
@@ -48,6 +49,9 @@ public class LakeRecordRecordEmitter<OUT> {
             ((PaimonSnapshotAndFlussLogSplitState) splitState)
                     .setRecordsToSkip(recordAndPos.readRecordsCount());
             sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
+        } else if (splitState instanceof LakeSnapshotSplitState) {
+            ((LakeSnapshotSplitState) 
splitState).setRecordsToSkip(recordAndPos.readRecordsCount());
+            sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
         } else {
             throw new UnsupportedOperationException(
                     "Unknown split state type: " + splitState.getClass());
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
new file mode 100644
index 000000000..8c148ccf5
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake;
+
+import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.client.metadata.LakeSnapshot;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
+import 
com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
+import com.alibaba.fluss.flink.source.split.LogSplit;
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.PartitionInfo;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableInfo;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.InnerTableScan;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
+import static 
com.alibaba.fluss.flink.utils.DataLakeUtils.extractLakeCatalogProperties;
+import static com.alibaba.fluss.utils.Preconditions.checkState;
+
+/** A generator for lake splits. */
+public class LakeSplitGenerator {
+
+    private final TableInfo tableInfo;
+    private final Admin flussAdmin;
+    private final OffsetsInitializer.BucketOffsetsRetriever 
bucketOffsetsRetriever;
+    private final OffsetsInitializer stoppingOffsetInitializer;
+    private final int bucketCount;
+
+    private final LakeSource<LakeSplit> lakeSource;
+
+    public LakeSplitGenerator(
+            TableInfo tableInfo,
+            Admin flussAdmin,
+            LakeSource<LakeSplit> lakeSource,
+            OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever,
+            OffsetsInitializer stoppingOffsetInitializer,
+            int bucketCount) {
+        this.tableInfo = tableInfo;
+        this.flussAdmin = flussAdmin;
+        this.lakeSource = lakeSource;
+        this.bucketOffsetsRetriever = bucketOffsetsRetriever;
+        this.stoppingOffsetInitializer = stoppingOffsetInitializer;
+        this.bucketCount = bucketCount;
+    }
+
+    public List<SourceSplitBase> generateHybridLakeSplits() throws Exception {
+        // get the file store
+        LakeSnapshot lakeSnapshotInfo =
+                
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
+        FileStoreTable fileStoreTable =
+                getTable(
+                        lakeSnapshotInfo.getSnapshotId(),
+                        
extractLakeCatalogProperties(tableInfo.getProperties()));
+
+        boolean isLogTable = !tableInfo.hasPrimaryKey();
+        boolean isPartitioned = tableInfo.isPartitioned();
+
+        Map<String, Map<Integer, List<LakeSplit>>> lakeSplits =
+                groupLakeSplits(
+                        lakeSource
+                                .createPlanner(
+                                        (LakeSource.PlannerContext) 
lakeSnapshotInfo::getSnapshotId)
+                                .plan());
+        if (isPartitioned) {
+            List<PartitionInfo> partitionInfos =
+                    
flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get();
+            Map<Long, String> partitionNameById =
+                    partitionInfos.stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            PartitionInfo::getPartitionId,
+                                            PartitionInfo::getPartitionName));
+            return generatePartitionTableSplit(
+                    lakeSplits,
+                    isLogTable,
+                    lakeSnapshotInfo.getTableBucketsOffset(),
+                    partitionNameById,
+                    fileStoreTable);
+        } else {
+            Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
+                    lakeSplits.values().iterator().next();
+            // non-partitioned table
+            return generateNoPartitionedTableSplit(
+                    nonPartitionLakeSplits,
+                    isLogTable,
+                    lakeSnapshotInfo.getTableBucketsOffset(),
+                    fileStoreTable);
+        }
+    }
+
+    private Map<String, Map<Integer, List<LakeSplit>>> 
groupLakeSplits(List<LakeSplit> lakeSplits) {
+        Map<String, Map<Integer, List<LakeSplit>>> result = new HashMap<>();
+        for (LakeSplit split : lakeSplits) {
+            String partition = String.join("$", split.partition());
+            int bucket = split.bucket();
+            // Get or create the partition group
+            Map<Integer, List<LakeSplit>> bucketMap =
+                    result.computeIfAbsent(partition, k -> new HashMap<>());
+            List<LakeSplit> splitList = bucketMap.computeIfAbsent(bucket, k -> 
new ArrayList<>());
+            splitList.add(split);
+        }
+        return result;
+    }
+
+    private List<SourceSplitBase> generatePartitionTableSplit(
+            Map<String, Map<Integer, List<LakeSplit>>> lakeSplits,
+            boolean isLogTable,
+            Map<TableBucket, Long> tableBucketSnapshotLogOffset,
+            Map<Long, String> partitionNameById,
+            @Nullable FileStoreTable fileStoreTable)
+            throws Exception {
+        List<SourceSplitBase> splits = new ArrayList<>();
+        Map<String, Long> flussPartitionIdByName =
+                partitionNameById.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getValue,
+                                        Map.Entry::getKey,
+                                        (existing, replacement) -> existing,
+                                        LinkedHashMap::new));
+        long lakeSplitPartitionId = -1L;
+
+        // iterate lake splits
+        for (Map.Entry<String, Map<Integer, List<LakeSplit>>> lakeSplitEntry :
+                lakeSplits.entrySet()) {
+            String partitionName = lakeSplitEntry.getKey();
+            Map<Integer, List<LakeSplit>> lakeSplitsOfPartition = 
lakeSplitEntry.getValue();
+            Long partitionId = flussPartitionIdByName.remove(partitionName);
+            if (partitionId != null) {
+                // mean the partition also exist in fluss partition
+                Map<Integer, Long> bucketEndOffset =
+                        stoppingOffsetInitializer.getBucketOffsets(
+                                partitionName,
+                                IntStream.range(0, bucketCount)
+                                        .boxed()
+                                        .collect(Collectors.toList()),
+                                bucketOffsetsRetriever);
+                splits.addAll(
+                        generateSplit(
+                                lakeSplitsOfPartition,
+                                partitionId,
+                                partitionName,
+                                isLogTable,
+                                tableBucketSnapshotLogOffset,
+                                bucketEndOffset,
+                                fileStoreTable));
+
+            } else {
+                // only lake data
+                splits.addAll(
+                        toLakeSnapshotSplits(
+                                lakeSplitsOfPartition,
+                                partitionName,
+                                // now, we can't get partition id for the 
partition only
+                                // in lake, set them to a arbitrary partition 
id, but
+                                // make sure different partition have 
different partition id
+                                // to enable different partition can be 
distributed to different
+                                // tasks
+                                lakeSplitPartitionId--));
+            }
+        }
+
+        // iterate remain fluss splits
+        for (Map.Entry<String, Long> partitionIdByNameEntry : 
flussPartitionIdByName.entrySet()) {
+            String partitionName = partitionIdByNameEntry.getKey();
+            long partitionId = partitionIdByNameEntry.getValue();
+            Map<Integer, Long> bucketEndOffset =
+                    stoppingOffsetInitializer.getBucketOffsets(
+                            partitionName,
+                            IntStream.range(0, 
bucketCount).boxed().collect(Collectors.toList()),
+                            bucketOffsetsRetriever);
+            splits.addAll(
+                    generateSplit(
+                            null,
+                            partitionId,
+                            partitionName,
+                            isLogTable,
+                            // pass empty map since we won't read lake splits
+                            Collections.emptyMap(),
+                            bucketEndOffset,
+                            fileStoreTable));
+        }
+        return splits;
+    }
+
+    private List<SourceSplitBase> generateSplit(
+            @Nullable Map<Integer, List<LakeSplit>> lakeSplits,
+            @Nullable Long partitionId,
+            @Nullable String partitionName,
+            boolean isLogTable,
+            Map<TableBucket, Long> tableBucketSnapshotLogOffset,
+            Map<Integer, Long> bucketEndOffset,
+            @Nullable FileStoreTable fileStoreTable) {
+        List<SourceSplitBase> splits = new ArrayList<>();
+        if (isLogTable) {
+            if (lakeSplits != null) {
+                splits.addAll(toLakeSnapshotSplits(lakeSplits, partitionName, 
partitionId));
+            }
+            for (int bucket = 0; bucket < bucketCount; bucket++) {
+                TableBucket tableBucket =
+                        new TableBucket(tableInfo.getTableId(), partitionId, 
bucket);
+                Long snapshotLogOffset = 
tableBucketSnapshotLogOffset.get(tableBucket);
+                long stoppingOffset = bucketEndOffset.get(bucket);
+                if (snapshotLogOffset == null) {
+                    // no any data commit to this bucket, scan from fluss log
+                    splits.add(
+                            new LogSplit(
+                                    tableBucket, partitionName, 
EARLIEST_OFFSET, stoppingOffset));
+                } else {
+                    // need to read remain fluss log
+                    if (snapshotLogOffset < stoppingOffset) {
+                        splits.add(
+                                new LogSplit(
+                                        tableBucket,
+                                        partitionName,
+                                        snapshotLogOffset,
+                                        stoppingOffset));
+                    }
+                }
+            }
+        } else {
+            // it's primary key table
+            for (int bucket = 0; bucket < bucketCount; bucket++) {
+                TableBucket tableBucket =
+                        new TableBucket(tableInfo.getTableId(), partitionId, 
bucket);
+                Long snapshotLogOffset = 
tableBucketSnapshotLogOffset.get(tableBucket);
+                long stoppingOffset = bucketEndOffset.get(bucket);
+                FileStoreSourceSplitGenerator splitGenerator = new 
FileStoreSourceSplitGenerator();
+
+                splits.add(
+                        generateSplitForPrimaryKeyTableBucket(
+                                fileStoreTable,
+                                splitGenerator,
+                                tableBucket,
+                                partitionName,
+                                snapshotLogOffset,
+                                stoppingOffset));
+            }
+        }
+
+        return splits;
+    }
+
+    private List<SourceSplitBase> toLakeSnapshotSplits(
+            Map<Integer, List<LakeSplit>> lakeSplits,
+            @Nullable String partitionName,
+            @Nullable Long partitionId) {
+        List<SourceSplitBase> splits = new ArrayList<>();
+        for (LakeSplit lakeSplit :
+                
lakeSplits.values().stream().flatMap(List::stream).collect(Collectors.toList()))
 {
+            TableBucket tableBucket =
+                    new TableBucket(tableInfo.getTableId(), partitionId, 
lakeSplit.bucket());
+            splits.add(new LakeSnapshotSplit(tableBucket, partitionName, 
lakeSplit));
+        }
+        return splits;
+    }
+
+    private SourceSplitBase generateSplitForPrimaryKeyTableBucket(
+            FileStoreTable fileStoreTable,
+            FileStoreSourceSplitGenerator splitGenerator,
+            TableBucket tableBucket,
+            @Nullable String partitionName,
+            @Nullable Long snapshotLogOffset,
+            long stoppingOffset) {
+
+        // no snapshot data for this bucket or no a corresponding log offset 
in this bucket,
+        // can only scan from change log
+        if (snapshotLogOffset == null || snapshotLogOffset < 0) {
+            return new PaimonSnapshotAndFlussLogSplit(
+                    tableBucket, partitionName, null, EARLIEST_OFFSET, 
stoppingOffset);
+        }
+
+        // then, generate a split contains
+        // snapshot and change log so that we can merge change log and snapshot
+        // to get the full data
+        fileStoreTable =
+                fileStoreTable.copy(
+                        Collections.singletonMap(
+                                CoreOptions.SOURCE_SPLIT_TARGET_SIZE.key(),
+                                // we set a max size to make sure only one 
splits
+                                MemorySize.MAX_VALUE.toString()));
+        InnerTableScan tableScan =
+                fileStoreTable.newScan().withBucketFilter((b) -> b == 
tableBucket.getBucket());
+
+        if (partitionName != null) {
+            tableScan =
+                    
tableScan.withPartitionFilter(getPartitionSpec(fileStoreTable, partitionName));
+        }
+
+        List<FileStoreSourceSplit> fileStoreSourceSplits =
+                splitGenerator.createSplits(tableScan.plan());
+
+        checkState(fileStoreSourceSplits.size() == 1, "Splits for primary key 
table must be 1.");
+        FileStoreSourceSplit fileStoreSourceSplit = 
fileStoreSourceSplits.get(0);
+        return new PaimonSnapshotAndFlussLogSplit(
+                tableBucket,
+                partitionName,
+                fileStoreSourceSplit,
+                snapshotLogOffset,
+                stoppingOffset);
+    }
+
+    private Map<String, String> getPartitionSpec(
+            FileStoreTable fileStoreTable, String partitionName) {
+        List<String> partitionKeys = fileStoreTable.partitionKeys();
+        checkState(
+                partitionKeys.size() == 1,
+                "Must only one partition key for paimon table %, but got %s, 
the partition keys are: ",
+                tableInfo.getTablePath(),
+                partitionKeys.size(),
+                partitionKeys);
+        return Collections.singletonMap(partitionKeys.get(0), partitionName);
+    }
+
+    private FileStoreTable getTable(long snapshotId, Map<String, String> 
catalogProperties)
+            throws Exception {
+        try (Catalog catalog =
+                
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(catalogProperties))) {
+            return (FileStoreTable)
+                    catalog.getTable(
+                                    Identifier.create(
+                                            
tableInfo.getTablePath().getDatabaseName(),
+                                            
tableInfo.getTablePath().getTableName()))
+                            .copy(
+                                    Collections.singletonMap(
+                                            CoreOptions.SCAN_SNAPSHOT_ID.key(),
+                                            String.valueOf(snapshotId)));
+        }
+    }
+
+    private List<SourceSplitBase> generateNoPartitionedTableSplit(
+            Map<Integer, List<LakeSplit>> lakeSplits,
+            boolean isLogTable,
+            Map<TableBucket, Long> tableBucketSnapshotLogOffset,
+            FileStoreTable fileStoreTable) {
+        // iterate all bucket
+        // assume bucket is from 0 to bucket count
+        Map<Integer, Long> bucketEndOffset =
+                stoppingOffsetInitializer.getBucketOffsets(
+                        null,
+                        IntStream.range(0, 
bucketCount).boxed().collect(Collectors.toList()),
+                        bucketOffsetsRetriever);
+        return generateSplit(
+                lakeSplits,
+                null,
+                null,
+                isLogTable,
+                tableBucketSnapshotLogOffset,
+                bucketEndOffset,
+                fileStoreTable);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitReaderGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
similarity index 84%
rename from 
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitReaderGenerator.java
rename to 
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
index ffe7b4bd6..02e5e5e5b 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitReaderGenerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
@@ -15,10 +15,11 @@
  * limitations under the License.
  */
 
-package com.alibaba.fluss.flink.lakehouse;
+package com.alibaba.fluss.flink.lake;
 
-import com.alibaba.fluss.client.Connection;
 import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.flink.lake.reader.LakeSnapshotScanner;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
 import 
com.alibaba.fluss.flink.lakehouse.paimon.reader.PaimonSnapshotAndLogSplitScanner;
 import com.alibaba.fluss.flink.lakehouse.paimon.reader.PaimonSnapshotScanner;
 import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
@@ -26,6 +27,8 @@ import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplit;
 import com.alibaba.fluss.flink.source.reader.BoundedSplitReader;
 import com.alibaba.fluss.flink.source.split.SourceSplitBase;
 import com.alibaba.fluss.flink.utils.DataLakeUtils;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
 import com.alibaba.fluss.metadata.TablePath;
 
 import org.apache.flink.util.ExceptionUtils;
@@ -46,21 +49,21 @@ import java.util.stream.IntStream;
 public class LakeSplitReaderGenerator {
 
     private final Table table;
-    private final Connection connection;
 
     private final TablePath tablePath;
     private FileStoreTable fileStoreTable;
     private final @Nullable int[] projectedFields;
+    private final @Nullable LakeSource<LakeSplit> lakeSource;
 
     public LakeSplitReaderGenerator(
             Table table,
-            Connection connection,
             TablePath tablePath,
-            @Nullable int[] projectedFields) {
+            @Nullable int[] projectedFields,
+            @Nullable LakeSource<LakeSplit> lakeSource) {
         this.table = table;
-        this.connection = connection;
         this.tablePath = tablePath;
         this.projectedFields = projectedFields;
+        this.lakeSource = lakeSource;
     }
 
     public void addSplit(SourceSplitBase split, Queue<SourceSplitBase> 
boundedSplits) {
@@ -68,6 +71,9 @@ public class LakeSplitReaderGenerator {
             boundedSplits.add(split);
         } else if (split instanceof PaimonSnapshotAndFlussLogSplit) {
             boundedSplits.add(split);
+        } else if (split instanceof LakeSnapshotSplit) {
+            boundedSplits.add(split);
+            // TODO support primary key table in 
https://github.com/apache/fluss/issues/1434
         } else {
             throw new UnsupportedOperationException(
                     String.format("The split type of %s is not supported.", 
split.getClass()));
@@ -100,6 +106,13 @@ public class LakeSplitReaderGenerator {
             return new BoundedSplitReader(
                     paimonSnapshotAndLogSplitScanner,
                     paimonSnapshotAndFlussLogSplit.getRecordsToSkip());
+        } else if (split instanceof LakeSnapshotSplit) {
+            LakeSnapshotSplit lakeSnapshotSplit = (LakeSnapshotSplit) split;
+            LakeSnapshotScanner lakeSnapshotScanner =
+                    new LakeSnapshotScanner(lakeSource, lakeSnapshotSplit);
+            return new BoundedSplitReader(
+                    lakeSnapshotScanner, 
lakeSnapshotSplit.getRecordsToSplit());
+            // TODO support primary key table in 
https://github.com/apache/fluss/issues/1434
         } else {
             throw new UnsupportedOperationException(
                     String.format("The split type of %s is not supported.", 
split.getClass()));
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java
new file mode 100644
index 000000000..19b6f29c7
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake;
+
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
+import com.alibaba.fluss.flink.source.split.LogSplit;
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.TableBucket;
+
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.flink.source.FileStoreSourceSplitSerializer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static 
com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit.LAKE_SNAPSHOT_SPLIT_KIND;
+import static 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit.PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
+
+/** A serializer for lake split. */
+public class LakeSplitSerializer {
+
+    private final SimpleVersionedSerializer<LakeSplit> sourceSplitSerializer;
+
+    public LakeSplitSerializer(SimpleVersionedSerializer<LakeSplit> 
sourceSplitSerializer) {
+        this.sourceSplitSerializer = sourceSplitSerializer;
+    }
+
+    public void serialize(DataOutputSerializer out, SourceSplitBase split) 
throws IOException {
+        if (split instanceof LakeSnapshotSplit) {
+            byte[] serializeBytes =
+                    sourceSplitSerializer.serialize(((LakeSnapshotSplit) 
split).getLakeSplit());
+            out.writeInt(serializeBytes.length);
+            out.write(serializeBytes);
+        } else if (split instanceof PaimonSnapshotAndFlussLogSplit) {
+            // TODO support primary key table in 
https://github.com/apache/fluss/issues/1434
+            FileStoreSourceSplitSerializer fileStoreSourceSplitSerializer =
+                    new FileStoreSourceSplitSerializer();
+            // writing file store source split
+            PaimonSnapshotAndFlussLogSplit paimonSnapshotAndFlussLogSplit =
+                    ((PaimonSnapshotAndFlussLogSplit) split);
+            FileStoreSourceSplit fileStoreSourceSplit =
+                    paimonSnapshotAndFlussLogSplit.getSnapshotSplit();
+            if (fileStoreSourceSplit == null) {
+                // no snapshot data for the bucket
+                out.writeBoolean(false);
+            } else {
+                out.writeBoolean(true);
+                byte[] serializeBytes =
+                        
fileStoreSourceSplitSerializer.serialize(fileStoreSourceSplit);
+                out.writeInt(serializeBytes.length);
+                out.write(serializeBytes);
+            }
+            // writing starting/stopping offset
+            out.writeLong(paimonSnapshotAndFlussLogSplit.getStartingOffset());
+            out.writeLong(
+                    paimonSnapshotAndFlussLogSplit
+                            .getStoppingOffset()
+                            .orElse(LogSplit.NO_STOPPING_OFFSET));
+            out.writeLong(paimonSnapshotAndFlussLogSplit.getRecordsToSkip());
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported split type: " + split.getClass().getName());
+        }
+    }
+
+    public SourceSplitBase deserialize(
+            byte splitKind,
+            TableBucket tableBucket,
+            @Nullable String partition,
+            DataInputDeserializer input)
+            throws IOException {
+        if (splitKind == LAKE_SNAPSHOT_SPLIT_KIND) {
+            byte[] serializeBytes = new byte[input.readInt()];
+            input.read(serializeBytes);
+            LakeSplit fileStoreSourceSplit =
+                    sourceSplitSerializer.deserialize(
+                            sourceSplitSerializer.getVersion(), 
serializeBytes);
+            return new LakeSnapshotSplit(tableBucket, partition, 
fileStoreSourceSplit);
+            // TODO support primary key table in 
https://github.com/apache/fluss/issues/1434
+        } else if (splitKind == PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
+            FileStoreSourceSplitSerializer fileStoreSourceSplitSerializer =
+                    new FileStoreSourceSplitSerializer();
+            FileStoreSourceSplit fileStoreSourceSplit = null;
+            if (input.readBoolean()) {
+                byte[] serializeBytes = new byte[input.readInt()];
+                input.read(serializeBytes);
+                fileStoreSourceSplit =
+                        fileStoreSourceSplitSerializer.deserialize(
+                                fileStoreSourceSplitSerializer.getVersion(), 
serializeBytes);
+            }
+            long startingOffset = input.readLong();
+            long stoppingOffset = input.readLong();
+            long recordsToSkip = input.readLong();
+            return new PaimonSnapshotAndFlussLogSplit(
+                    tableBucket,
+                    partition,
+                    fileStoreSourceSplit,
+                    startingOffset,
+                    stoppingOffset,
+                    recordsToSkip);
+        } else {
+            throw new UnsupportedOperationException("Unsupported split kind: " 
+ splitKind);
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitStateInitializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
similarity index 76%
rename from 
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitStateInitializer.java
rename to 
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
index 1fbd20a25..aae8987ad 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitStateInitializer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package com.alibaba.fluss.flink.lakehouse;
+package com.alibaba.fluss.flink.lake;
 
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import com.alibaba.fluss.flink.lake.state.LakeSnapshotSplitState;
 import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
 import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplitState;
-import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplit;
-import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplitState;
 import com.alibaba.fluss.flink.source.split.SourceSplitBase;
 import com.alibaba.fluss.flink.source.split.SourceSplitState;
 
@@ -28,10 +28,11 @@ import 
com.alibaba.fluss.flink.source.split.SourceSplitState;
 public class LakeSplitStateInitializer {
 
     public static SourceSplitState initializedState(SourceSplitBase split) {
-        if (split instanceof PaimonSnapshotSplit) {
-            return new PaimonSnapshotSplitState((PaimonSnapshotSplit) split);
-        } else if (split instanceof PaimonSnapshotAndFlussLogSplit) {
+        if (split instanceof PaimonSnapshotAndFlussLogSplit) {
             return new 
PaimonSnapshotAndFlussLogSplitState((PaimonSnapshotAndFlussLogSplit) split);
+        } else if (split instanceof LakeSnapshotSplit) {
+            return new LakeSnapshotSplitState((LakeSnapshotSplit) split);
+            // TODO support primary key table in 
https://github.com/apache/fluss/issues/1434
         } else {
             throw new UnsupportedOperationException("Unsupported split type: " 
+ split);
         }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeTableFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeTableFactory.java
similarity index 98%
rename from 
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeTableFactory.java
rename to 
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeTableFactory.java
index f5ea9a3d3..5092a59e6 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeTableFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeTableFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package com.alibaba.fluss.flink.lakehouse;
+package com.alibaba.fluss.flink.lake;
 
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.source.DynamicTableSource;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotScanner.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotScanner.java
new file mode 100644
index 000000000..8a51482ba
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotScanner.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.reader;
+
+import com.alibaba.fluss.client.table.scanner.batch.BatchScanner;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+
+/** A scanner for reading lake split {@link LakeSnapshotSplit}. */
+public class LakeSnapshotScanner implements BatchScanner {
+
+    private final LakeSource<LakeSplit> lakeSource;
+    private final LakeSnapshotSplit lakeSnapshotSplit;
+
+    private CloseableIterator<InternalRow> rowsIterator;
+
+    public LakeSnapshotScanner(
+            LakeSource<LakeSplit> lakeSource, LakeSnapshotSplit 
lakeSnapshotSplit) {
+        this.lakeSource = lakeSource;
+        this.lakeSnapshotSplit = lakeSnapshotSplit;
+    }
+
+    @Nullable
+    @Override
+    public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws 
IOException {
+        if (rowsIterator == null) {
+            rowsIterator =
+                    InternalRowIterator.wrap(
+                            lakeSource
+                                    .createRecordReader(
+                                            
(LakeSource.ReaderContext<LakeSplit>)
+                                                    
lakeSnapshotSplit::getLakeSplit)
+                                    .read());
+        }
+        return rowsIterator.hasNext() ? rowsIterator : null;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (rowsIterator != null) {
+            rowsIterator.close();
+        }
+    }
+
+    private static class InternalRowIterator implements 
CloseableIterator<InternalRow> {
+
+        private final CloseableIterator<LogRecord> recordCloseableIterator;
+
+        private static InternalRowIterator wrap(
+                CloseableIterator<LogRecord> recordCloseableIterator) {
+            return new InternalRowIterator(recordCloseableIterator);
+        }
+
+        private InternalRowIterator(CloseableIterator<LogRecord> 
recordCloseableIterator) {
+            this.recordCloseableIterator = recordCloseableIterator;
+        }
+
+        @Override
+        public void close() {
+            recordCloseableIterator.close();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return recordCloseableIterator.hasNext();
+        }
+
+        @Override
+        public InternalRow next() {
+            return recordCloseableIterator.next().getRow();
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/split/LakeSnapshotSplit.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/split/LakeSnapshotSplit.java
new file mode 100644
index 000000000..b2460177a
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/split/LakeSnapshotSplit.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.split;
+
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.TableBucket;
+
+import javax.annotation.Nullable;
+
+/** A split for reading a snapshot of lake. */
+public class LakeSnapshotSplit extends SourceSplitBase {
+
+    public static final byte LAKE_SNAPSHOT_SPLIT_KIND = -1;
+
+    private final LakeSplit lakeSplit;
+
+    private final long recordsToSplit;
+
+    public LakeSnapshotSplit(
+            TableBucket tableBucket, @Nullable String partitionName, LakeSplit 
lakeSplit) {
+        this(tableBucket, partitionName, lakeSplit, 0);
+    }
+
+    public LakeSnapshotSplit(
+            TableBucket tableBucket,
+            @Nullable String partitionName,
+            LakeSplit lakeSplit,
+            long recordsToSplit) {
+        super(tableBucket, partitionName);
+        this.lakeSplit = lakeSplit;
+        this.recordsToSplit = recordsToSplit;
+    }
+
+    public LakeSplit getLakeSplit() {
+        return lakeSplit;
+    }
+
+    public long getRecordsToSplit() {
+        return recordsToSplit;
+    }
+
+    @Override
+    public String splitId() {
+        return toSplitId(
+                "lake-snapshot-",
+                new TableBucket(
+                        tableBucket.getTableId(),
+                        tableBucket.getPartitionId(),
+                        lakeSplit.bucket()));
+    }
+
+    @Override
+    public boolean isLakeSplit() {
+        return true;
+    }
+
+    @Override
+    public byte splitKind() {
+        return LAKE_SNAPSHOT_SPLIT_KIND;
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotSplitState.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotSplitState.java
new file mode 100644
index 000000000..ae6eb3fa4
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotSplitState.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.state;
+
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.flink.source.split.SourceSplitState;
+
+/** The state of {@link LakeSnapshotSplit}. */
+public class LakeSnapshotSplitState extends SourceSplitState {
+
+    private final LakeSnapshotSplit split;
+    private long recordsToSplit;
+
+    public LakeSnapshotSplitState(LakeSnapshotSplit split) {
+        super(split);
+        this.split = split;
+        this.recordsToSplit = split.getRecordsToSplit();
+    }
+
+    public void setRecordsToSkip(long recordsToSkip) {
+        this.recordsToSplit = recordsToSkip;
+    }
+
+    @Override
+    public SourceSplitBase toSourceSplit() {
+        return new LakeSnapshotSplit(
+                split.getTableBucket(),
+                split.getPartitionName(),
+                split.getLakeSplit(),
+                recordsToSplit);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java
index d806164e0..1b4f750b0 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java
@@ -31,6 +31,8 @@ import 
com.alibaba.fluss.flink.source.split.SourceSplitSerializer;
 import 
com.alibaba.fluss.flink.source.state.FlussSourceEnumeratorStateSerializer;
 import com.alibaba.fluss.flink.source.state.SourceEnumeratorState;
 import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
 import com.alibaba.fluss.metadata.TablePath;
 import com.alibaba.fluss.types.RowType;
 
@@ -70,6 +72,8 @@ public class FlinkSource<OUT>
 
     private final List<FieldEqual> partitionFilters;
 
+    private final @Nullable LakeSource<LakeSplit> lakeSource;
+
     public FlinkSource(
             Configuration flussConf,
             TablePath tablePath,
@@ -82,6 +86,34 @@ public class FlinkSource<OUT>
             FlussDeserializationSchema<OUT> deserializationSchema,
             boolean streaming,
             List<FieldEqual> partitionFilters) {
+        this(
+                flussConf,
+                tablePath,
+                hasPrimaryKey,
+                isPartitioned,
+                sourceOutputType,
+                projectedFields,
+                offsetsInitializer,
+                scanPartitionDiscoveryIntervalMs,
+                deserializationSchema,
+                streaming,
+                partitionFilters,
+                null);
+    }
+
+    public FlinkSource(
+            Configuration flussConf,
+            TablePath tablePath,
+            boolean hasPrimaryKey,
+            boolean isPartitioned,
+            RowType sourceOutputType,
+            @Nullable int[] projectedFields,
+            OffsetsInitializer offsetsInitializer,
+            long scanPartitionDiscoveryIntervalMs,
+            FlussDeserializationSchema<OUT> deserializationSchema,
+            boolean streaming,
+            List<FieldEqual> partitionFilters,
+            LakeSource<LakeSplit> lakeSource) {
         this.flussConf = flussConf;
         this.tablePath = tablePath;
         this.hasPrimaryKey = hasPrimaryKey;
@@ -93,6 +125,7 @@ public class FlinkSource<OUT>
         this.deserializationSchema = deserializationSchema;
         this.streaming = streaming;
         this.partitionFilters = checkNotNull(partitionFilters);
+        this.lakeSource = lakeSource;
     }
 
     @Override
@@ -112,7 +145,8 @@ public class FlinkSource<OUT>
                 offsetsInitializer,
                 scanPartitionDiscoveryIntervalMs,
                 streaming,
-                partitionFilters);
+                partitionFilters,
+                lakeSource);
     }
 
     @Override
@@ -130,12 +164,13 @@ public class FlinkSource<OUT>
                 offsetsInitializer,
                 scanPartitionDiscoveryIntervalMs,
                 streaming,
-                partitionFilters);
+                partitionFilters,
+                lakeSource);
     }
 
     @Override
     public SimpleVersionedSerializer<SourceSplitBase> getSplitSerializer() {
-        return SourceSplitSerializer.INSTANCE;
+        return new SourceSplitSerializer(lakeSource);
     }
 
     @Override
@@ -166,7 +201,8 @@ public class FlinkSource<OUT>
                 context,
                 projectedFields,
                 flinkSourceReaderMetrics,
-                recordEmitter);
+                recordEmitter,
+                lakeSource);
     }
 
     @Override
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java
index 65e8ac813..deb986a50 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java
@@ -28,6 +28,8 @@ import 
com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils;
 import com.alibaba.fluss.flink.utils.FlinkConversions;
 import com.alibaba.fluss.flink.utils.PushdownUtils;
 import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
 import com.alibaba.fluss.metadata.MergeEngineType;
 import com.alibaba.fluss.metadata.TablePath;
 import com.alibaba.fluss.types.RowType;
@@ -76,6 +78,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import static com.alibaba.fluss.flink.utils.LakeSourceUtils.createLakeSource;
 import static 
com.alibaba.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
 import static com.alibaba.fluss.flink.utils.PushdownUtils.extractFieldEquals;
 import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
@@ -130,6 +133,10 @@ public class FlinkTableSource
 
     private List<FieldEqual> partitionFilters = Collections.emptyList();
 
+    private final Map<String, String> tableOptions;
+
+    @Nullable private LakeSource<LakeSplit> lakeSource;
+
     public FlinkTableSource(
             TablePath tablePath,
             Configuration flussConfig,
@@ -144,7 +151,8 @@ public class FlinkTableSource
             @Nullable LookupCache cache,
             long scanPartitionDiscoveryIntervalMs,
             boolean isDataLakeEnabled,
-            @Nullable MergeEngineType mergeEngineType) {
+            @Nullable MergeEngineType mergeEngineType,
+            Map<String, String> tableOptions) {
         this.tablePath = tablePath;
         this.flussConfig = flussConfig;
         this.tableOutputType = tableOutputType;
@@ -162,6 +170,10 @@ public class FlinkTableSource
         this.scanPartitionDiscoveryIntervalMs = 
scanPartitionDiscoveryIntervalMs;
         this.isDataLakeEnabled = isDataLakeEnabled;
         this.mergeEngineType = mergeEngineType;
+        this.tableOptions = tableOptions;
+        if (isDataLakeEnabled) {
+            this.lakeSource = createLakeSource(tablePath, tableOptions);
+        }
     }
 
     @Override
@@ -270,7 +282,8 @@ public class FlinkTableSource
                         scanPartitionDiscoveryIntervalMs,
                         new RowDataDeserializationSchema(),
                         streaming,
-                        partitionFilters);
+                        partitionFilters,
+                        lakeSource);
 
         if (!streaming) {
             // return a bounded source provide to make planner happy,
@@ -359,12 +372,14 @@ public class FlinkTableSource
                         cache,
                         scanPartitionDiscoveryIntervalMs,
                         isDataLakeEnabled,
-                        mergeEngineType);
+                        mergeEngineType,
+                        tableOptions);
         source.producedDataType = producedDataType;
         source.projectedFields = projectedFields;
         source.singleRowFilter = singleRowFilter;
         source.modificationScanType = modificationScanType;
         source.partitionFilters = partitionFilters;
+        source.lakeSource = lakeSource;
         return source;
     }
 
@@ -382,10 +397,17 @@ public class FlinkTableSource
     public void applyProjection(int[][] projectedFields, DataType 
producedDataType) {
         this.projectedFields = Arrays.stream(projectedFields).mapToInt(value 
-> value[0]).toArray();
         this.producedDataType = producedDataType.getLogicalType();
+        if (lakeSource != null) {
+            lakeSource.withProject(projectedFields);
+        }
     }
 
     @Override
     public Result applyFilters(List<ResolvedExpression> filters) {
+        if (lakeSource != null) {
+            // todo: use real filters
+        }
+
         List<ResolvedExpression> acceptedFilters = new ArrayList<>();
         List<ResolvedExpression> remainingFilters = new ArrayList<>();
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/emitter/FlinkRecordEmitter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/emitter/FlinkRecordEmitter.java
index 3e62fdd0e..84602ca35 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/emitter/FlinkRecordEmitter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/emitter/FlinkRecordEmitter.java
@@ -18,7 +18,7 @@
 package com.alibaba.fluss.flink.source.emitter;
 
 import com.alibaba.fluss.client.table.scanner.ScanRecord;
-import com.alibaba.fluss.flink.lakehouse.LakeRecordRecordEmitter;
+import com.alibaba.fluss.flink.lake.LakeRecordRecordEmitter;
 import com.alibaba.fluss.flink.source.deserializer.FlussDeserializationSchema;
 import com.alibaba.fluss.flink.source.reader.FlinkSourceReader;
 import com.alibaba.fluss.flink.source.reader.RecordAndPos;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index e65e64464..05ac7fa6e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -23,7 +23,7 @@ import com.alibaba.fluss.client.admin.Admin;
 import com.alibaba.fluss.client.metadata.KvSnapshots;
 import com.alibaba.fluss.config.ConfigOptions;
 import com.alibaba.fluss.config.Configuration;
-import com.alibaba.fluss.flink.lakehouse.LakeSplitGenerator;
+import com.alibaba.fluss.flink.lake.LakeSplitGenerator;
 import 
com.alibaba.fluss.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl;
 import 
com.alibaba.fluss.flink.source.enumerator.initializer.NoStoppingOffsetsInitializer;
 import 
com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
@@ -36,6 +36,8 @@ import com.alibaba.fluss.flink.source.split.LogSplit;
 import com.alibaba.fluss.flink.source.split.SourceSplitBase;
 import com.alibaba.fluss.flink.source.state.SourceEnumeratorState;
 import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
 import com.alibaba.fluss.metadata.PartitionInfo;
 import com.alibaba.fluss.metadata.TableBucket;
 import com.alibaba.fluss.metadata.TableInfo;
@@ -130,6 +132,8 @@ public class FlinkSourceEnumerator
 
     private final List<FieldEqual> partitionFilters;
 
+    @Nullable private final LakeSource<LakeSplit> lakeSource;
+
     public FlinkSourceEnumerator(
             TablePath tablePath,
             Configuration flussConf,
@@ -140,6 +144,30 @@ public class FlinkSourceEnumerator
             long scanPartitionDiscoveryIntervalMs,
             boolean streaming,
             List<FieldEqual> partitionFilters) {
+        this(
+                tablePath,
+                flussConf,
+                hasPrimaryKey,
+                isPartitioned,
+                context,
+                startingOffsetsInitializer,
+                scanPartitionDiscoveryIntervalMs,
+                streaming,
+                partitionFilters,
+                null);
+    }
+
+    public FlinkSourceEnumerator(
+            TablePath tablePath,
+            Configuration flussConf,
+            boolean hasPrimaryKey,
+            boolean isPartitioned,
+            SplitEnumeratorContext<SourceSplitBase> context,
+            OffsetsInitializer startingOffsetsInitializer,
+            long scanPartitionDiscoveryIntervalMs,
+            boolean streaming,
+            List<FieldEqual> partitionFilters,
+            LakeSource<LakeSplit> lakeSource) {
         this(
                 tablePath,
                 flussConf,
@@ -151,7 +179,8 @@ public class FlinkSourceEnumerator
                 startingOffsetsInitializer,
                 scanPartitionDiscoveryIntervalMs,
                 streaming,
-                partitionFilters);
+                partitionFilters,
+                lakeSource);
     }
 
     public FlinkSourceEnumerator(
@@ -165,7 +194,8 @@ public class FlinkSourceEnumerator
             OffsetsInitializer startingOffsetsInitializer,
             long scanPartitionDiscoveryIntervalMs,
             boolean streaming,
-            List<FieldEqual> partitionFilters) {
+            List<FieldEqual> partitionFilters,
+            @Nullable LakeSource<LakeSplit> lakeSource) {
         this.tablePath = checkNotNull(tablePath);
         this.flussConf = checkNotNull(flussConf);
         this.hasPrimaryKey = hasPrimaryKey;
@@ -180,6 +210,7 @@ public class FlinkSourceEnumerator
         this.partitionFilters = checkNotNull(partitionFilters);
         this.stoppingOffsetsInitializer =
                 streaming ? new NoStoppingOffsetsInitializer() : 
OffsetsInitializer.latest();
+        this.lakeSource = lakeSource;
     }
 
     @Override
@@ -485,10 +516,12 @@ public class FlinkSourceEnumerator
                 new LakeSplitGenerator(
                         tableInfo,
                         flussAdmin,
+                        lakeSource,
                         bucketOffsetsRetriever,
                         stoppingOffsetsInitializer,
                         tableInfo.getNumBuckets());
-        return lakeSplitGenerator.generateLakeSplits();
+        List<SourceSplitBase> lakeSplits = 
lakeSplitGenerator.generateHybridLakeSplits();
+        return lakeSplits;
     }
 
     private boolean ignoreTableBucket(TableBucket tableBucket) {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java
index ba7f6642a..d2db9ac8c 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java
@@ -18,7 +18,7 @@
 package com.alibaba.fluss.flink.source.reader;
 
 import com.alibaba.fluss.config.Configuration;
-import com.alibaba.fluss.flink.lakehouse.LakeSplitStateInitializer;
+import com.alibaba.fluss.flink.lake.LakeSplitStateInitializer;
 import com.alibaba.fluss.flink.source.emitter.FlinkRecordEmitter;
 import com.alibaba.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent;
 import com.alibaba.fluss.flink.source.event.PartitionsRemovedEvent;
@@ -28,6 +28,8 @@ import 
com.alibaba.fluss.flink.source.split.HybridSnapshotLogSplitState;
 import com.alibaba.fluss.flink.source.split.LogSplitState;
 import com.alibaba.fluss.flink.source.split.SourceSplitBase;
 import com.alibaba.fluss.flink.source.split.SourceSplitState;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
 import com.alibaba.fluss.metadata.TableBucket;
 import com.alibaba.fluss.metadata.TablePath;
 import com.alibaba.fluss.types.RowType;
@@ -57,7 +59,8 @@ public class FlinkSourceReader<OUT>
             SourceReaderContext context,
             @Nullable int[] projectedFields,
             FlinkSourceReaderMetrics flinkSourceReaderMetrics,
-            FlinkRecordEmitter<OUT> recordEmitter) {
+            FlinkRecordEmitter<OUT> recordEmitter,
+            LakeSource<LakeSplit> lakeSource) {
         super(
                 elementsQueue,
                 new FlinkSourceFetcherManager(
@@ -68,7 +71,8 @@ public class FlinkSourceReader<OUT>
                                         tablePath,
                                         sourceOutputType,
                                         projectedFields,
-                                        flinkSourceReaderMetrics),
+                                        flinkSourceReaderMetrics,
+                                        lakeSource),
                         (ignore) -> {}),
                 recordEmitter,
                 context.getConfiguration(),
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
index 55c57c79c..ac18b5283 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
@@ -26,13 +26,15 @@ import 
com.alibaba.fluss.client.table.scanner.log.LogScanner;
 import com.alibaba.fluss.client.table.scanner.log.ScanRecords;
 import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.exception.PartitionNotExistException;
-import com.alibaba.fluss.flink.lakehouse.LakeSplitReaderGenerator;
+import com.alibaba.fluss.flink.lake.LakeSplitReaderGenerator;
 import com.alibaba.fluss.flink.metrics.FlinkMetricRegistry;
 import com.alibaba.fluss.flink.source.metrics.FlinkSourceReaderMetrics;
 import com.alibaba.fluss.flink.source.split.HybridSnapshotLogSplit;
 import com.alibaba.fluss.flink.source.split.LogSplit;
 import com.alibaba.fluss.flink.source.split.SnapshotSplit;
 import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
 import com.alibaba.fluss.metadata.TableBucket;
 import com.alibaba.fluss.metadata.TablePath;
 import com.alibaba.fluss.types.RowType;
@@ -99,6 +101,8 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
     private final Table table;
     private final FlinkMetricRegistry flinkMetricRegistry;
 
+    @Nullable private LakeSource<LakeSplit> lakeSource;
+
     // table id, will be null when haven't received any split
     private Long tableId;
 
@@ -116,7 +120,8 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
             TablePath tablePath,
             RowType sourceOutputType,
             @Nullable int[] projectedFields,
-            FlinkSourceReaderMetrics flinkSourceReaderMetrics) {
+            FlinkSourceReaderMetrics flinkSourceReaderMetrics,
+            @Nullable LakeSource<LakeSplit> lakeSource) {
         this.flinkMetricRegistry =
                 new 
FlinkMetricRegistry(flinkSourceReaderMetrics.getSourceReaderMetricGroup());
         this.connection = ConnectionFactory.createConnection(flussConf, 
flinkMetricRegistry);
@@ -131,6 +136,7 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
         this.logScanner = 
table.newScan().project(projectedFields).createLogScanner();
         this.stoppingOffsets = new HashMap<>();
         this.emptyLogSplits = new HashSet<>();
+        this.lakeSource = lakeSource;
     }
 
     @Override
@@ -216,7 +222,8 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
     private LakeSplitReaderGenerator getLakeSplitReader() {
         if (lakeSplitReaderGenerator == null) {
             lakeSplitReaderGenerator =
-                    new LakeSplitReaderGenerator(table, connection, tablePath, 
projectedFields);
+                    new LakeSplitReaderGenerator(
+                            table, tablePath, projectedFields, 
checkNotNull(lakeSource));
         }
         return lakeSplitReaderGenerator;
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializer.java
index dda093086..cdbbd3216 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializer.java
@@ -17,20 +17,24 @@
 
 package com.alibaba.fluss.flink.source.split;
 
-import com.alibaba.fluss.flink.lakehouse.LakeSplitSerializer;
+import com.alibaba.fluss.flink.lake.LakeSplitSerializer;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
 import com.alibaba.fluss.metadata.TableBucket;
 
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+
 /** A serializer for the {@link SourceSplitBase}. */
 public class SourceSplitSerializer implements 
SimpleVersionedSerializer<SourceSplitBase> {
 
-    public static final SourceSplitSerializer INSTANCE = new 
SourceSplitSerializer();
-
     private static final int VERSION_0 = 0;
 
     private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
@@ -41,7 +45,11 @@ public class SourceSplitSerializer implements 
SimpleVersionedSerializer<SourceSp
 
     private static final int CURRENT_VERSION = VERSION_0;
 
-    private LakeSplitSerializer lakeSplitSerializer;
+    @Nullable private final LakeSource<LakeSplit> lakeSource;
+
+    public SourceSplitSerializer(LakeSource<LakeSplit> lakeSource) {
+        this.lakeSource = lakeSource;
+    }
 
     @Override
     public int getVersion() {
@@ -75,7 +83,9 @@ public class SourceSplitSerializer implements 
SimpleVersionedSerializer<SourceSp
                 
out.writeLong(logSplit.getStoppingOffset().orElse(LogSplit.NO_STOPPING_OFFSET));
             }
         } else {
-            getLakeSplitSerializer().serialize(out, split);
+            LakeSplitSerializer lakeSplitSerializer =
+                    new 
LakeSplitSerializer(checkNotNull(lakeSource).getSplitSerializer());
+            lakeSplitSerializer.serialize(out, split);
         }
 
         final byte[] result = out.getCopyOfBuffer();
@@ -135,14 +145,9 @@ public class SourceSplitSerializer implements 
SimpleVersionedSerializer<SourceSp
             long stoppingOffset = in.readLong();
             return new LogSplit(tableBucket, partitionName, startingOffset, 
stoppingOffset);
         } else {
-            return getLakeSplitSerializer().deserialize(splitKind, 
tableBucket, partitionName, in);
-        }
-    }
-
-    private LakeSplitSerializer getLakeSplitSerializer() {
-        if (lakeSplitSerializer == null) {
-            lakeSplitSerializer = new LakeSplitSerializer();
+            LakeSplitSerializer lakeSplitSerializer =
+                    new 
LakeSplitSerializer(checkNotNull(lakeSource).getSplitSerializer());
+            return lakeSplitSerializer.deserialize(splitKind, tableBucket, 
partitionName, in);
         }
-        return lakeSplitSerializer;
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java
index da65bd646..1c95cf87e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java
@@ -24,6 +24,7 @@ import com.alibaba.fluss.config.MemorySize;
 import com.alibaba.fluss.config.Password;
 import com.alibaba.fluss.flink.FlinkConnectorOptions;
 import com.alibaba.fluss.flink.catalog.FlinkCatalogFactory;
+import com.alibaba.fluss.metadata.DataLakeFormat;
 import com.alibaba.fluss.metadata.DatabaseDescriptor;
 import com.alibaba.fluss.metadata.Schema;
 import com.alibaba.fluss.metadata.TableDescriptor;
@@ -51,6 +52,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static com.alibaba.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
@@ -94,6 +96,20 @@ public class FlinkConversions {
         // put fluss table properties into flink options, to make the 
properties visible to users
         
convertFlussTablePropertiesToFlinkOptions(tableInfo.getProperties().toMap(), 
newOptions);
 
+        // put lake related options to table options
+        Optional<DataLakeFormat> optDataLakeFormat = 
tableInfo.getTableConfig().getDataLakeFormat();
+        if (optDataLakeFormat.isPresent()) {
+            DataLakeFormat dataLakeFormat = optDataLakeFormat.get();
+            String dataLakePrefix = "table.datalake." + dataLakeFormat + ".";
+
+            for (Map.Entry<String, String> tableProperty :
+                    tableInfo.getProperties().toMap().entrySet()) {
+                if (tableProperty.getKey().startsWith(dataLakePrefix)) {
+                    newOptions.put(tableProperty.getKey(), 
tableProperty.getValue());
+                }
+            }
+        }
+
         org.apache.flink.table.api.Schema.Builder schemaBuilder =
                 org.apache.flink.table.api.Schema.newBuilder();
         if (tableInfo.hasPrimaryKey()) {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/LakeSourceUtils.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/LakeSourceUtils.java
new file mode 100644
index 000000000..6b849e243
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/LakeSourceUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.utils;
+
+import com.alibaba.fluss.config.ConfigOptions;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.lakestorage.LakeStorage;
+import com.alibaba.fluss.lake.lakestorage.LakeStoragePlugin;
+import com.alibaba.fluss.lake.lakestorage.LakeStoragePluginSetUp;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.TablePath;
+
+import java.util.Map;
+
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+
+/** Utils for create lake source. */
+public class LakeSourceUtils {
+
+    @SuppressWarnings("unchecked")
+    public static LakeSource<LakeSplit> createLakeSource(
+            TablePath tablePath, Map<String, String> properties) {
+        Map<String, String> catalogProperties =
+                
DataLakeUtils.extractLakeCatalogProperties(Configuration.fromMap(properties));
+        Configuration lakeConfig = Configuration.fromMap(catalogProperties);
+
+        String dataLake =
+                Configuration.fromMap(properties)
+                        .get(ConfigOptions.TABLE_DATALAKE_FORMAT)
+                        .toString();
+        LakeStoragePlugin lakeStoragePlugin =
+                LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
+        LakeStorage lakeStorage = 
checkNotNull(lakeStoragePlugin).createLakeStorage(lakeConfig);
+        return (LakeSource<LakeSplit>) lakeStorage.createLakeSource(tablePath);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
index e518f7b05..a7594a0b5 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
@@ -356,7 +356,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             OffsetsInitializer.earliest(),
                             DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                             streaming,
-                            Collections.emptyList());
+                            Collections.emptyList(),
+                            null);
 
             enumerator.start();
             assertThat(context.getSplitsAssignmentSequence()).isEmpty();
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReaderTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReaderTest.java
index d29749853..4ce072631 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReaderTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReaderTest.java
@@ -180,6 +180,7 @@ class FlinkSourceReaderTest extends FlinkTestBase {
                 context,
                 null,
                 new FlinkSourceReaderMetrics(context.metricGroup()),
-                recordEmitter);
+                recordEmitter,
+                null);
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
index 29ead7607..4984b7d7b 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
@@ -89,7 +89,8 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
                                                 DataTypes.FIELD("name", 
DataTypes.STRING()),
                                                 DataTypes.FIELD("age", 
DataTypes.INT())),
                                         null,
-                                        createMockSourceReaderMetrics()))
+                                        createMockSourceReaderMetrics(),
+                                        null))
                 .isInstanceOf(ValidationException.class)
                 .hasMessage(
                         "The Flink query schema is not matched to Fluss table 
schema. \n"
@@ -106,7 +107,8 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
                                                         "id", 
DataTypes.BIGINT().copy(false)),
                                                 DataTypes.FIELD("name", 
DataTypes.STRING())),
                                         new int[] {1, 0},
-                                        createMockSourceReaderMetrics()))
+                                        createMockSourceReaderMetrics(),
+                                        null))
                 .isInstanceOf(ValidationException.class)
                 .hasMessage(
                         "The Flink query schema is not matched to Fluss table 
schema. \n"
@@ -394,7 +396,7 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
 
     private FlinkSourceSplitReader createSplitReader(TablePath tablePath, 
RowType rowType) {
         return new FlinkSourceSplitReader(
-                clientConf, tablePath, rowType, null, 
createMockSourceReaderMetrics());
+                clientConf, tablePath, rowType, null, 
createMockSourceReaderMetrics(), null);
     }
 
     private FlinkSourceReaderMetrics createMockSourceReaderMetrics() {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializerTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializerTest.java
index da6e7830b..7491eb568 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializerTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializerTest.java
@@ -31,7 +31,7 @@ import static org.assertj.core.api.Assertions.assertThat;
  */
 class SourceSplitSerializerTest {
 
-    private static final SourceSplitSerializer serializer = 
SourceSplitSerializer.INSTANCE;
+    private static final SourceSplitSerializer serializer = new 
SourceSplitSerializer(null);
     private static final TableBucket tableBucket = new TableBucket(1, 2);
     private static final TableBucket partitionedTableBucket = new 
TableBucket(1, 100L, 2);
 
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index d4015908b..aa565fbaa 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -319,6 +319,7 @@
                                         
<exclude>com.alibaba.fluss.metrics.*</exclude>
                                         <!-- end exclude for metric -->
                                         
<exclude>com.alibaba.fluss.flink.lakehouse.*</exclude>
+                                        
<exclude>com.alibaba.fluss.flink.lake.*</exclude>
                                         
<exclude>com.alibaba.fluss.kafka.*</exclude>
                                         <!-- exclude for fluss-ci-tools -->
                                         
<exclude>com.alibaba.fluss.tools.ci.*</exclude>

Reply via email to