luoyuxia commented on code in PR #1527: URL: https://github.com/apache/fluss/pull/1527#discussion_r2271918188
########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java: ########## @@ -0,0 +1,337 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. Review Comment: Now, you need to use apache License See https://fluss.apache.org/community/dev/ide-setup/#copyright-profile ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java: ########## @@ -32,6 +34,9 @@ public static SourceSplitState initializedState(SourceSplitBase split) { return new PaimonSnapshotSplitState((PaimonSnapshotSplit) split); Review Comment: We can also remove ``` if (split instanceof PaimonSnapshotSplit) { return new PaimonSnapshotSplitState((PaimonSnapshotSplit) split); } ``` ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/KeyValueRow.java: ########## @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.reader; + +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.ProjectedRow; + +/** An {@link InternalRow} with the key part. */ +public class KeyValueRow { Review Comment: Remove this in this pr ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/SortMergeReader.java: ########## @@ -0,0 +1,323 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.reader; + +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.ProjectedRow; +import com.alibaba.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; + +/** . */ +public class SortMergeReader { Review Comment: Remove this in this pr ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java: ########## @@ -46,28 +49,31 @@ public class LakeSplitReaderGenerator { private final Table table; - private final Connection connection; private final TablePath tablePath; private FileStoreTable fileStoreTable; private final @Nullable int[] projectedFields; + private final @Nullable LakeSource<LakeSplit> lakeSource; public LakeSplitReaderGenerator( Table table, - Connection connection, TablePath tablePath, - @Nullable int[] projectedFields) { + @Nullable int[] projectedFields, + @Nullable LakeSource<LakeSplit> lakeSource) { this.table = table; - this.connection = connection; this.tablePath = tablePath; this.projectedFields = projectedFields; + this.lakeSource = lakeSource; } public void addSplit(SourceSplitBase split, Queue<SourceSplitBase> boundedSplits) { if (split instanceof PaimonSnapshotSplit) { Review Comment: I think we can remove this now ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotScanner.java: ########## @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.reader; + +import com.alibaba.fluss.client.table.scanner.batch.BatchScanner; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; + +/** . */ Review Comment: Please add comments for this class. ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java: ########## @@ -485,10 +516,13 @@ private List<SourceSplitBase> getLakeSplit() throws Exception { new LakeSplitGenerator( tableInfo, flussAdmin, + lakeSource, bucketOffsetsRetriever, stoppingOffsetsInitializer, tableInfo.getNumBuckets()); - return lakeSplitGenerator.generateLakeSplits(); + List<SourceSplitBase> lakeSplits = lakeSplitGenerator.generateHybridLakeSplits(); + System.out.println(lakeSplits); Review Comment: remove this. ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java: ########## @@ -70,6 +72,8 @@ public class FlinkSource<OUT> private final List<FieldEqual> partitionFilters; + final @Nullable LakeSource<LakeSplit> lakeSource; Review Comment: nit: ```suggestion private final @Nullable LakeSource<LakeSplit> lakeSource; ``` ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java: ########## @@ -130,6 +133,10 @@ public class FlinkTableSource private List<FieldEqual> partitionFilters = Collections.emptyList(); + private final Map<String, String> tableOptions; + + @Nullable private LakeSource<LakeSplit> lakeSource; Review Comment: nit: ```suggestion @Nullable private final LakeSource<LakeSplit> lakeSource; ``` ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java: ########## @@ -0,0 +1,337 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake; + +import com.alibaba.fluss.client.admin.Admin; +import com.alibaba.fluss.client.metadata.LakeSnapshot; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit; +import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit; +import com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer; +import com.alibaba.fluss.flink.source.split.LogSplit; +import com.alibaba.fluss.flink.source.split.SourceSplitBase; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.metadata.PartitionInfo; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableInfo; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.source.FileStoreSourceSplit; +import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.InnerTableScan; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET; +import static com.alibaba.fluss.flink.utils.DataLakeUtils.extractLakeCatalogProperties; +import static com.alibaba.fluss.utils.Preconditions.checkState; + +/** A generator for lake splits. */ +public class LakeSplitGenerator { + + private final TableInfo tableInfo; + private final Admin flussAdmin; + private final OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever; + private final OffsetsInitializer stoppingOffsetInitializer; + private final int bucketCount; + + private final LakeSource<LakeSplit> lakeSource; + + public LakeSplitGenerator( + TableInfo tableInfo, + Admin flussAdmin, + LakeSource<LakeSplit> lakeSource, + OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever, + OffsetsInitializer stoppingOffsetInitializer, + int bucketCount) { + this.tableInfo = tableInfo; + this.flussAdmin = flussAdmin; + this.lakeSource = lakeSource; + this.bucketOffsetsRetriever = bucketOffsetsRetriever; + this.stoppingOffsetInitializer = stoppingOffsetInitializer; + this.bucketCount = bucketCount; + } + + public List<SourceSplitBase> generateHybridLakeSplits() throws Exception { + // get the file store + LakeSnapshot lakeSnapshotInfo = + flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get(); + FileStoreTable fileStoreTable = + getTable( + lakeSnapshotInfo.getSnapshotId(), + extractLakeCatalogProperties(tableInfo.getProperties())); + + boolean isLogTable = !tableInfo.hasPrimaryKey(); + boolean isPartitioned = tableInfo.isPartitioned(); + + Map<String, Map<Integer, List<LakeSplit>>> lakeSplits = + groupLakeSplits( + lakeSource + .createPlanner( + (LakeSource.PlannerContext) lakeSnapshotInfo::getSnapshotId) + .plan()); + if (isPartitioned) { + List<PartitionInfo> partitionInfos = + flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get(); + Map<Long, String> partitionNameById = + partitionInfos.stream() + .collect( + Collectors.toMap( + PartitionInfo::getPartitionId, + PartitionInfo::getPartitionName)); + return generatePartitionTableSplit( + lakeSplits, + isLogTable, + lakeSnapshotInfo.getTableBucketsOffset(), + partitionNameById, + fileStoreTable); + } else { + Map<Integer, List<LakeSplit>> nonPartitionLakeSplits = + lakeSplits.values().iterator().next(); + // non-partitioned table + return generateNoPartitionedTableSplit( + nonPartitionLakeSplits, + isLogTable, + lakeSnapshotInfo.getTableBucketsOffset(), + fileStoreTable); + } + } + + private Map<String, Map<Integer, List<LakeSplit>>> groupLakeSplits(List<LakeSplit> lakeSplits) { + Map<String, Map<Integer, List<LakeSplit>>> result = new HashMap<>(); + for (LakeSplit split : lakeSplits) { + String partition = String.join("$", split.partition()); + int bucket = split.bucket(); + // Get or create the partition group + Map<Integer, List<LakeSplit>> bucketMap = + result.computeIfAbsent(partition, k -> new HashMap<>()); + List<LakeSplit> splitList = bucketMap.computeIfAbsent(bucket, k -> new ArrayList<>()); + splitList.add(split); + } + return result; + } + + private List<SourceSplitBase> generatePartitionTableSplit( + Map<String, Map<Integer, List<LakeSplit>>> lakeSplits, + boolean isLogTable, + Map<TableBucket, Long> tableBucketSnapshotLogOffset, + Map<Long, String> partitionNameById, + @Nullable FileStoreTable fileStoreTable) + throws Exception { + List<SourceSplitBase> splits = new ArrayList<>(); + for (Map.Entry<Long, String> partitionNameByIdEntry : partitionNameById.entrySet()) { + long partitionId = partitionNameByIdEntry.getKey(); + String partitionName = partitionNameByIdEntry.getValue(); + Map<Integer, Long> bucketEndOffset = + stoppingOffsetInitializer.getBucketOffsets( + partitionName, + IntStream.range(0, bucketCount).boxed().collect(Collectors.toList()), + bucketOffsetsRetriever); + splits.addAll( + generateSplit( + lakeSplits.get(partitionName), + partitionId, + partitionName, + isLogTable, + tableBucketSnapshotLogOffset, + bucketEndOffset, + fileStoreTable)); + } + return splits; + } + + private List<SourceSplitBase> generateSplit( + @Nullable Map<Integer, List<LakeSplit>> lakeSplits, + @Nullable Long partitionId, + @Nullable String partitionName, + boolean isLogTable, + Map<TableBucket, Long> tableBucketSnapshotLogOffset, + Map<Integer, Long> bucketEndOffset, + @Nullable FileStoreTable fileStoreTable) { + List<SourceSplitBase> splits = new ArrayList<>(); + if (isLogTable) { + // it's log table, we don't care about bucket, and we can't get bucket in paimon's Review Comment: the comments are out of date, remove this ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java: ########## @@ -0,0 +1,337 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake; + +import com.alibaba.fluss.client.admin.Admin; +import com.alibaba.fluss.client.metadata.LakeSnapshot; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit; +import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit; +import com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer; +import com.alibaba.fluss.flink.source.split.LogSplit; +import com.alibaba.fluss.flink.source.split.SourceSplitBase; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.metadata.PartitionInfo; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableInfo; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.source.FileStoreSourceSplit; +import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.InnerTableScan; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET; +import static com.alibaba.fluss.flink.utils.DataLakeUtils.extractLakeCatalogProperties; +import static com.alibaba.fluss.utils.Preconditions.checkState; + +/** A generator for lake splits. */ +public class LakeSplitGenerator { + + private final TableInfo tableInfo; + private final Admin flussAdmin; + private final OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever; + private final OffsetsInitializer stoppingOffsetInitializer; + private final int bucketCount; + + private final LakeSource<LakeSplit> lakeSource; + + public LakeSplitGenerator( + TableInfo tableInfo, + Admin flussAdmin, + LakeSource<LakeSplit> lakeSource, + OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever, + OffsetsInitializer stoppingOffsetInitializer, + int bucketCount) { + this.tableInfo = tableInfo; + this.flussAdmin = flussAdmin; + this.lakeSource = lakeSource; + this.bucketOffsetsRetriever = bucketOffsetsRetriever; + this.stoppingOffsetInitializer = stoppingOffsetInitializer; + this.bucketCount = bucketCount; + } + + public List<SourceSplitBase> generateHybridLakeSplits() throws Exception { + // get the file store + LakeSnapshot lakeSnapshotInfo = + flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get(); + FileStoreTable fileStoreTable = + getTable( + lakeSnapshotInfo.getSnapshotId(), + extractLakeCatalogProperties(tableInfo.getProperties())); + + boolean isLogTable = !tableInfo.hasPrimaryKey(); + boolean isPartitioned = tableInfo.isPartitioned(); + + Map<String, Map<Integer, List<LakeSplit>>> lakeSplits = + groupLakeSplits( + lakeSource + .createPlanner( + (LakeSource.PlannerContext) lakeSnapshotInfo::getSnapshotId) + .plan()); + if (isPartitioned) { + List<PartitionInfo> partitionInfos = + flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get(); + Map<Long, String> partitionNameById = + partitionInfos.stream() + .collect( + Collectors.toMap( + PartitionInfo::getPartitionId, + PartitionInfo::getPartitionName)); + return generatePartitionTableSplit( + lakeSplits, + isLogTable, + lakeSnapshotInfo.getTableBucketsOffset(), + partitionNameById, + fileStoreTable); + } else { + Map<Integer, List<LakeSplit>> nonPartitionLakeSplits = + lakeSplits.values().iterator().next(); + // non-partitioned table + return generateNoPartitionedTableSplit( + nonPartitionLakeSplits, + isLogTable, + lakeSnapshotInfo.getTableBucketsOffset(), + fileStoreTable); + } + } + + private Map<String, Map<Integer, List<LakeSplit>>> groupLakeSplits(List<LakeSplit> lakeSplits) { + Map<String, Map<Integer, List<LakeSplit>>> result = new HashMap<>(); + for (LakeSplit split : lakeSplits) { + String partition = String.join("$", split.partition()); + int bucket = split.bucket(); + // Get or create the partition group + Map<Integer, List<LakeSplit>> bucketMap = + result.computeIfAbsent(partition, k -> new HashMap<>()); + List<LakeSplit> splitList = bucketMap.computeIfAbsent(bucket, k -> new ArrayList<>()); + splitList.add(split); + } + return result; + } + + private List<SourceSplitBase> generatePartitionTableSplit( + Map<String, Map<Integer, List<LakeSplit>>> lakeSplits, + boolean isLogTable, + Map<TableBucket, Long> tableBucketSnapshotLogOffset, + Map<Long, String> partitionNameById, + @Nullable FileStoreTable fileStoreTable) + throws Exception { + List<SourceSplitBase> splits = new ArrayList<>(); + for (Map.Entry<Long, String> partitionNameByIdEntry : partitionNameById.entrySet()) { + long partitionId = partitionNameByIdEntry.getKey(); + String partitionName = partitionNameByIdEntry.getValue(); + Map<Integer, Long> bucketEndOffset = + stoppingOffsetInitializer.getBucketOffsets( + partitionName, + IntStream.range(0, bucketCount).boxed().collect(Collectors.toList()), + bucketOffsetsRetriever); + splits.addAll( + generateSplit( + lakeSplits.get(partitionName), + partitionId, + partitionName, + isLogTable, + tableBucketSnapshotLogOffset, + bucketEndOffset, + fileStoreTable)); + } + return splits; + } + + private List<SourceSplitBase> generateSplit( + @Nullable Map<Integer, List<LakeSplit>> lakeSplits, + @Nullable Long partitionId, + @Nullable String partitionName, + boolean isLogTable, + Map<TableBucket, Long> tableBucketSnapshotLogOffset, + Map<Integer, Long> bucketEndOffset, + @Nullable FileStoreTable fileStoreTable) { + List<SourceSplitBase> splits = new ArrayList<>(); + if (isLogTable) { + // it's log table, we don't care about bucket, and we can't get bucket in paimon's + // dynamic bucket; so first generate split for the whole paimon snapshot, + // then generate log split for each bucket paimon snapshot + fluss log + splits.addAll(toLakeSnapshotSplits(lakeSplits, partitionName, partitionId)); Review Comment: Argument 'lakeSplits' might be null, skip to lake snapshot splits when `lakeSplits` is null? ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/emitter/FlinkRecordEmitter.java: ########## @@ -95,6 +95,8 @@ record = deserializationSchema.deserialize(scanRecord); e); } + System.out.println("record: " + record); Review Comment: remove this ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java: ########## @@ -0,0 +1,337 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake; + +import com.alibaba.fluss.client.admin.Admin; +import com.alibaba.fluss.client.metadata.LakeSnapshot; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit; +import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit; +import com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer; +import com.alibaba.fluss.flink.source.split.LogSplit; +import com.alibaba.fluss.flink.source.split.SourceSplitBase; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.metadata.PartitionInfo; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableInfo; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.source.FileStoreSourceSplit; +import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.InnerTableScan; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET; +import static com.alibaba.fluss.flink.utils.DataLakeUtils.extractLakeCatalogProperties; +import static com.alibaba.fluss.utils.Preconditions.checkState; + +/** A generator for lake splits. */ +public class LakeSplitGenerator { + + private final TableInfo tableInfo; + private final Admin flussAdmin; + private final OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever; + private final OffsetsInitializer stoppingOffsetInitializer; + private final int bucketCount; + + private final LakeSource<LakeSplit> lakeSource; + + public LakeSplitGenerator( + TableInfo tableInfo, + Admin flussAdmin, + LakeSource<LakeSplit> lakeSource, + OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever, + OffsetsInitializer stoppingOffsetInitializer, + int bucketCount) { + this.tableInfo = tableInfo; + this.flussAdmin = flussAdmin; + this.lakeSource = lakeSource; + this.bucketOffsetsRetriever = bucketOffsetsRetriever; + this.stoppingOffsetInitializer = stoppingOffsetInitializer; + this.bucketCount = bucketCount; + } + + public List<SourceSplitBase> generateHybridLakeSplits() throws Exception { + // get the file store + LakeSnapshot lakeSnapshotInfo = + flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get(); + FileStoreTable fileStoreTable = + getTable( + lakeSnapshotInfo.getSnapshotId(), + extractLakeCatalogProperties(tableInfo.getProperties())); + + boolean isLogTable = !tableInfo.hasPrimaryKey(); + boolean isPartitioned = tableInfo.isPartitioned(); + + Map<String, Map<Integer, List<LakeSplit>>> lakeSplits = + groupLakeSplits( + lakeSource + .createPlanner( + (LakeSource.PlannerContext) lakeSnapshotInfo::getSnapshotId) + .plan()); + if (isPartitioned) { + List<PartitionInfo> partitionInfos = + flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get(); + Map<Long, String> partitionNameById = + partitionInfos.stream() + .collect( + Collectors.toMap( + PartitionInfo::getPartitionId, + PartitionInfo::getPartitionName)); + return generatePartitionTableSplit( + lakeSplits, + isLogTable, + lakeSnapshotInfo.getTableBucketsOffset(), + partitionNameById, + fileStoreTable); + } else { + Map<Integer, List<LakeSplit>> nonPartitionLakeSplits = + lakeSplits.values().iterator().next(); + // non-partitioned table + return generateNoPartitionedTableSplit( + nonPartitionLakeSplits, + isLogTable, + lakeSnapshotInfo.getTableBucketsOffset(), + fileStoreTable); + } + } + + private Map<String, Map<Integer, List<LakeSplit>>> groupLakeSplits(List<LakeSplit> lakeSplits) { + Map<String, Map<Integer, List<LakeSplit>>> result = new HashMap<>(); + for (LakeSplit split : lakeSplits) { + String partition = String.join("$", split.partition()); + int bucket = split.bucket(); + // Get or create the partition group + Map<Integer, List<LakeSplit>> bucketMap = + result.computeIfAbsent(partition, k -> new HashMap<>()); + List<LakeSplit> splitList = bucketMap.computeIfAbsent(bucket, k -> new ArrayList<>()); + splitList.add(split); + } + return result; + } + + private List<SourceSplitBase> generatePartitionTableSplit( + Map<String, Map<Integer, List<LakeSplit>>> lakeSplits, + boolean isLogTable, + Map<TableBucket, Long> tableBucketSnapshotLogOffset, + Map<Long, String> partitionNameById, + @Nullable FileStoreTable fileStoreTable) + throws Exception { + List<SourceSplitBase> splits = new ArrayList<>(); + for (Map.Entry<Long, String> partitionNameByIdEntry : partitionNameById.entrySet()) { + long partitionId = partitionNameByIdEntry.getKey(); + String partitionName = partitionNameByIdEntry.getValue(); + Map<Integer, Long> bucketEndOffset = + stoppingOffsetInitializer.getBucketOffsets( + partitionName, + IntStream.range(0, bucketCount).boxed().collect(Collectors.toList()), + bucketOffsetsRetriever); + splits.addAll( + generateSplit( + lakeSplits.get(partitionName), + partitionId, + partitionName, + isLogTable, + tableBucketSnapshotLogOffset, + bucketEndOffset, + fileStoreTable)); + } + return splits; + } + + private List<SourceSplitBase> generateSplit( + @Nullable Map<Integer, List<LakeSplit>> lakeSplits, + @Nullable Long partitionId, + @Nullable String partitionName, + boolean isLogTable, + Map<TableBucket, Long> tableBucketSnapshotLogOffset, + Map<Integer, Long> bucketEndOffset, + @Nullable FileStoreTable fileStoreTable) { + List<SourceSplitBase> splits = new ArrayList<>(); + if (isLogTable) { + // it's log table, we don't care about bucket, and we can't get bucket in paimon's + // dynamic bucket; so first generate split for the whole paimon snapshot, + // then generate log split for each bucket paimon snapshot + fluss log + splits.addAll(toLakeSnapshotSplits(lakeSplits, partitionName, partitionId)); + for (int bucket = 0; bucket < bucketCount; bucket++) { + TableBucket tableBucket = + new TableBucket(tableInfo.getTableId(), partitionId, bucket); + Long snapshotLogOffset = tableBucketSnapshotLogOffset.get(tableBucket); + long stoppingOffset = bucketEndOffset.get(bucket); + if (snapshotLogOffset == null) { + // no any data commit to this bucket, scan from fluss log + splits.add( + new LogSplit( + tableBucket, partitionName, EARLIEST_OFFSET, stoppingOffset)); + } else { + // need to read remain fluss log + if (snapshotLogOffset < stoppingOffset) { + splits.add( + new LogSplit( + tableBucket, + partitionName, + snapshotLogOffset, + stoppingOffset)); + } + } + } + } else { + // it's primary key table + for (int bucket = 0; bucket < bucketCount; bucket++) { + TableBucket tableBucket = + new TableBucket(tableInfo.getTableId(), partitionId, bucket); + Long snapshotLogOffset = tableBucketSnapshotLogOffset.get(tableBucket); + long stoppingOffset = bucketEndOffset.get(bucket); + FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator(); + + splits.add( + generateSplitForPrimaryKeyTableBucket( + fileStoreTable, + splitGenerator, + tableBucket, + partitionName, + snapshotLogOffset, + stoppingOffset)); + } + } + + return splits; + } + + private List<SourceSplitBase> toLakeSnapshotSplits( + Map<Integer, List<LakeSplit>> lakeSplits, + @Nullable String partitionName, + @Nullable Long partitionId) { + List<SourceSplitBase> splits = new ArrayList<>(); + TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, -1); Review Comment: ``` List<SourceSplitBase> splits = new ArrayList<>(); for (LakeSplit lakeSplit : lakeSplits.values().stream().flatMap(List::stream).collect(Collectors.toList())) { TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, lakeSplit.bucket()); splits.add(new LakeSnapshotSplit(tableBucket, partitionName, lakeSplit)); } return splits; ``` ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java: ########## @@ -382,10 +397,18 @@ public boolean supportsNestedProjection() { public void applyProjection(int[][] projectedFields, DataType producedDataType) { this.projectedFields = Arrays.stream(projectedFields).mapToInt(value -> value[0]).toArray(); this.producedDataType = producedDataType.getLogicalType(); + if (lakeSource != null) { + lakeSource.withProject(projectedFields); + } } @Override public Result applyFilters(List<ResolvedExpression> filters) { + if (lakeSource != null) { + // todo: use real filters + lakeSource.withFilters(Collections.emptyList()); Review Comment: But partition filter push down is a must in https://github.com/apache/fluss/pull/1527/files#diff-ebec624f8a39fadb5f5ab74ac8b8c32a2ae3cb3d497841807595b27bb4aa57d3R457 I create a issue #1141 to track it. You just need to left a todo for #1141 in https://github.com/apache/fluss/pull/1527/files#diff-ebec624f8a39fadb5f5ab74ac8b8c32a2ae3cb3d497841807595b27bb4aa57d3R457 for this pr ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java: ########## @@ -0,0 +1,337 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake; + +import com.alibaba.fluss.client.admin.Admin; +import com.alibaba.fluss.client.metadata.LakeSnapshot; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit; +import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit; +import com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer; +import com.alibaba.fluss.flink.source.split.LogSplit; +import com.alibaba.fluss.flink.source.split.SourceSplitBase; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.metadata.PartitionInfo; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableInfo; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.source.FileStoreSourceSplit; +import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.InnerTableScan; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET; +import static com.alibaba.fluss.flink.utils.DataLakeUtils.extractLakeCatalogProperties; +import static com.alibaba.fluss.utils.Preconditions.checkState; + +/** A generator for lake splits. */ +public class LakeSplitGenerator { + + private final TableInfo tableInfo; + private final Admin flussAdmin; + private final OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever; + private final OffsetsInitializer stoppingOffsetInitializer; + private final int bucketCount; + + private final LakeSource<LakeSplit> lakeSource; + + public LakeSplitGenerator( + TableInfo tableInfo, + Admin flussAdmin, + LakeSource<LakeSplit> lakeSource, + OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever, + OffsetsInitializer stoppingOffsetInitializer, + int bucketCount) { + this.tableInfo = tableInfo; + this.flussAdmin = flussAdmin; + this.lakeSource = lakeSource; + this.bucketOffsetsRetriever = bucketOffsetsRetriever; + this.stoppingOffsetInitializer = stoppingOffsetInitializer; + this.bucketCount = bucketCount; + } + + public List<SourceSplitBase> generateHybridLakeSplits() throws Exception { + // get the file store + LakeSnapshot lakeSnapshotInfo = + flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get(); + FileStoreTable fileStoreTable = + getTable( + lakeSnapshotInfo.getSnapshotId(), + extractLakeCatalogProperties(tableInfo.getProperties())); + + boolean isLogTable = !tableInfo.hasPrimaryKey(); + boolean isPartitioned = tableInfo.isPartitioned(); + + Map<String, Map<Integer, List<LakeSplit>>> lakeSplits = + groupLakeSplits( + lakeSource + .createPlanner( + (LakeSource.PlannerContext) lakeSnapshotInfo::getSnapshotId) + .plan()); + if (isPartitioned) { + List<PartitionInfo> partitionInfos = + flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get(); + Map<Long, String> partitionNameById = + partitionInfos.stream() + .collect( + Collectors.toMap( + PartitionInfo::getPartitionId, + PartitionInfo::getPartitionName)); + return generatePartitionTableSplit( + lakeSplits, + isLogTable, + lakeSnapshotInfo.getTableBucketsOffset(), + partitionNameById, + fileStoreTable); + } else { + Map<Integer, List<LakeSplit>> nonPartitionLakeSplits = + lakeSplits.values().iterator().next(); + // non-partitioned table + return generateNoPartitionedTableSplit( + nonPartitionLakeSplits, + isLogTable, + lakeSnapshotInfo.getTableBucketsOffset(), + fileStoreTable); + } + } + + private Map<String, Map<Integer, List<LakeSplit>>> groupLakeSplits(List<LakeSplit> lakeSplits) { + Map<String, Map<Integer, List<LakeSplit>>> result = new HashMap<>(); + for (LakeSplit split : lakeSplits) { + String partition = String.join("$", split.partition()); + int bucket = split.bucket(); + // Get or create the partition group + Map<Integer, List<LakeSplit>> bucketMap = + result.computeIfAbsent(partition, k -> new HashMap<>()); + List<LakeSplit> splitList = bucketMap.computeIfAbsent(bucket, k -> new ArrayList<>()); + splitList.add(split); + } + return result; + } + + private List<SourceSplitBase> generatePartitionTableSplit( + Map<String, Map<Integer, List<LakeSplit>>> lakeSplits, + boolean isLogTable, + Map<TableBucket, Long> tableBucketSnapshotLogOffset, + Map<Long, String> partitionNameById, + @Nullable FileStoreTable fileStoreTable) + throws Exception { + List<SourceSplitBase> splits = new ArrayList<>(); + for (Map.Entry<Long, String> partitionNameByIdEntry : partitionNameById.entrySet()) { Review Comment: This logic will cause if fluss only have partition p1, p2 due to partition ttl, but lake table has partition p0, p1, p2, then p0 will not be read. Suggesst to be ``` List<SourceSplitBase> splits = new ArrayList<>(); Map<String, Long> flussPartitionIdByName = flussPartitionNameById.entrySet().stream() .collect( Collectors.toMap( Map.Entry::getValue, Map.Entry::getKey, (existing, replacement) -> existing, LinkedHashMap::new)); long lakeSplitPartitionId = -1L; // iterate lake splits for (Map.Entry<String, Map<Integer, List<LakeSplit>>> lakeSplitEntry : lakeSplits.entrySet()) { String partitionName = lakeSplitEntry.getKey(); Map<Integer, List<LakeSplit>> lakeSplitsOfPartition = lakeSplitEntry.getValue(); Long partitionId = flussPartitionIdByName.remove(partitionName); if (partitionId != null) { // mean the partition also exist in fluss partition Map<Integer, Long> bucketEndOffset = stoppingOffsetInitializer.getBucketOffsets( partitionName, IntStream.range(0, bucketCount) .boxed() .collect(Collectors.toList()), bucketOffsetsRetriever); splits.addAll( generateSplit( lakeSplitsOfPartition, partitionId, partitionName, isLogTable, tableBucketSnapshotLogOffset, bucketEndOffset, fileStoreTable)); } else { // only lake data splits.addAll( toLakeSnapshotSplits( lakeSplitsOfPartition, partitionName, // now, we can't get partition id for the partition only // in lake, set them to a arbitrary partition id, but // make sure different partition have different partition id // to enable different partition can be distributed to different // tasks lakeSplitPartitionId--)); } } // iterate remain fluss splits for (Map.Entry<String, Long> partitionIdByNameEntry : flussPartitionIdByName.entrySet()) { String partitionName = partitionIdByNameEntry.getKey(); long partitionId = partitionIdByNameEntry.getValue(); Map<Integer, Long> bucketEndOffset = stoppingOffsetInitializer.getBucketOffsets( partitionName, IntStream.range(0, bucketCount).boxed().collect(Collectors.toList()), bucketOffsetsRetriever); splits.addAll( generateSplit( null, partitionId, partitionName, isLogTable, // pass empty map since we won't read lake splits Collections.emptyMap(), bucketEndOffset, fileStoreTable)); } return splits; ``` ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java: ########## @@ -382,10 +397,18 @@ public boolean supportsNestedProjection() { public void applyProjection(int[][] projectedFields, DataType producedDataType) { this.projectedFields = Arrays.stream(projectedFields).mapToInt(value -> value[0]).toArray(); this.producedDataType = producedDataType.getLogicalType(); + if (lakeSource != null) { + lakeSource.withProject(projectedFields); + } } @Override public Result applyFilters(List<ResolvedExpression> filters) { + if (lakeSource != null) { + // todo: use real filters + lakeSource.withFilters(Collections.emptyList()); Review Comment: let's just do nothing in here, but just left todo for apply filters for future works. ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotSplitState.java: ########## @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.state; + +import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit; +import com.alibaba.fluss.flink.source.split.SourceSplitBase; +import com.alibaba.fluss.flink.source.split.SourceSplitState; + +/** . */ Review Comment: Add comments ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializer.java: ########## @@ -17,19 +17,25 @@ package com.alibaba.fluss.flink.source.split; -import com.alibaba.fluss.flink.lakehouse.LakeSplitSerializer; +import com.alibaba.fluss.flink.lake.LakeSplitSerializer; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; import com.alibaba.fluss.metadata.TableBucket; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; +import javax.annotation.Nullable; + import java.io.IOException; +import static com.alibaba.fluss.utils.Preconditions.checkNotNull; + /** A serializer for the {@link SourceSplitBase}. */ public class SourceSplitSerializer implements SimpleVersionedSerializer<SourceSplitBase> { - public static final SourceSplitSerializer INSTANCE = new SourceSplitSerializer(); + public static final SourceSplitSerializer INSTANCE = new SourceSplitSerializer(null); Review Comment: remove this since it's error-prone and only used in testing code. ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/LakeSourceUtils.java: ########## @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.utils; + +import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.lake.lakestorage.LakeStorage; +import com.alibaba.fluss.lake.lakestorage.LakeStoragePlugin; +import com.alibaba.fluss.lake.lakestorage.LakeStoragePluginSetUp; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.metadata.TablePath; + +import java.util.Map; + +import static com.alibaba.fluss.utils.Preconditions.checkNotNull; + +/** */ Review Comment: add comments. ########## fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java: ########## @@ -46,28 +49,31 @@ public class LakeSplitReaderGenerator { private final Table table; - private final Connection connection; private final TablePath tablePath; private FileStoreTable fileStoreTable; private final @Nullable int[] projectedFields; + private final @Nullable LakeSource<LakeSplit> lakeSource; public LakeSplitReaderGenerator( Table table, - Connection connection, TablePath tablePath, - @Nullable int[] projectedFields) { + @Nullable int[] projectedFields, + @Nullable LakeSource<LakeSplit> lakeSource) { this.table = table; - this.connection = connection; this.tablePath = tablePath; this.projectedFields = projectedFields; + this.lakeSource = lakeSource; } public void addSplit(SourceSplitBase split, Queue<SourceSplitBase> boundedSplits) { if (split instanceof PaimonSnapshotSplit) { boundedSplits.add(split); } else if (split instanceof PaimonSnapshotAndFlussLogSplit) { boundedSplits.add(split); + } else if (split instanceof LakeSnapshotSplit) { Review Comment: We should support `LakeSnapshotSplit` in this pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
