This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push: new 4cf486d68 [flink] Union read decouple with paimon for log table (#1527) 4cf486d68 is described below commit 4cf486d6808e0babadade19f40812c2bf15d54a1 Author: CaoZhen <caozhen1...@outlook.com> AuthorDate: Wed Aug 13 04:50:20 2025 -0700 [flink] Union read decouple with paimon for log table (#1527) --- .../alibaba/fluss/flink/catalog/FlinkCatalog.java | 2 +- .../fluss/flink/catalog/FlinkTableFactory.java | 5 +- .../flink/{lakehouse => lake}/LakeCatalog.java | 2 +- .../LakeRecordRecordEmitter.java | 6 +- .../fluss/flink/lake/LakeSplitGenerator.java | 391 +++++++++++++++++++++ .../LakeSplitReaderGenerator.java | 25 +- .../fluss/flink/lake/LakeSplitSerializer.java | 126 +++++++ .../LakeSplitStateInitializer.java | 13 +- .../{lakehouse => lake}/LakeTableFactory.java | 2 +- .../flink/lake/reader/LakeSnapshotScanner.java | 97 +++++ .../fluss/flink/lake/split/LakeSnapshotSplit.java | 77 ++++ .../flink/lake/state/LakeSnapshotSplitState.java | 48 +++ .../alibaba/fluss/flink/source/FlinkSource.java | 44 ++- .../fluss/flink/source/FlinkTableSource.java | 28 +- .../flink/source/emitter/FlinkRecordEmitter.java | 2 +- .../source/enumerator/FlinkSourceEnumerator.java | 41 ++- .../flink/source/reader/FlinkSourceReader.java | 10 +- .../source/reader/FlinkSourceSplitReader.java | 13 +- .../flink/source/split/SourceSplitSerializer.java | 31 +- .../fluss/flink/utils/FlinkConversions.java | 16 + .../alibaba/fluss/flink/utils/LakeSourceUtils.java | 52 +++ .../enumerator/FlinkSourceEnumeratorTest.java | 3 +- .../flink/source/reader/FlinkSourceReaderTest.java | 3 +- .../source/reader/FlinkSourceSplitReaderTest.java | 8 +- .../source/split/SourceSplitSerializerTest.java | 2 +- fluss-test-coverage/pom.xml | 1 + 26 files changed, 993 insertions(+), 55 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java index 6dca5dca5..476b1aabf 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java @@ -24,7 +24,7 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.exception.InvalidTableException; -import com.alibaba.fluss.flink.lakehouse.LakeCatalog; +import com.alibaba.fluss.flink.lake.LakeCatalog; import com.alibaba.fluss.flink.procedure.ProcedureManager; import com.alibaba.fluss.flink.utils.CatalogExceptionUtils; import com.alibaba.fluss.flink.utils.DataLakeUtils; diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java index e9b5cc313..c943aa5f3 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java @@ -20,7 +20,7 @@ package com.alibaba.fluss.flink.catalog; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.flink.FlinkConnectorOptions; -import com.alibaba.fluss.flink.lakehouse.LakeTableFactory; +import com.alibaba.fluss.flink.lake.LakeTableFactory; import com.alibaba.fluss.flink.sink.FlinkTableSink; import com.alibaba.fluss.flink.source.FlinkTableSource; import com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils; @@ -143,7 +143,8 @@ public class FlinkTableFactory implements DynamicTableSourceFactory, DynamicTabl cache, partitionDiscoveryIntervalMs, tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)), - tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE))); + tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)), + context.getCatalogTable().getOptions()); } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeCatalog.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeCatalog.java similarity index 98% rename from fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeCatalog.java rename to fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeCatalog.java index 3f42b8ff7..8c24e6e3c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeCatalog.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeCatalog.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package com.alibaba.fluss.flink.lakehouse; +package com.alibaba.fluss.flink.lake; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.ObjectPath; diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeRecordRecordEmitter.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java similarity index 87% rename from fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeRecordRecordEmitter.java rename to fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java index 5133ddec4..207422ed8 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeRecordRecordEmitter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package com.alibaba.fluss.flink.lakehouse; +package com.alibaba.fluss.flink.lake; import com.alibaba.fluss.client.table.scanner.ScanRecord; +import com.alibaba.fluss.flink.lake.state.LakeSnapshotSplitState; import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplitState; import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplitState; import com.alibaba.fluss.flink.source.reader.RecordAndPos; @@ -48,6 +49,9 @@ public class LakeRecordRecordEmitter<OUT> { ((PaimonSnapshotAndFlussLogSplitState) splitState) .setRecordsToSkip(recordAndPos.readRecordsCount()); sourceOutputFunc.accept(recordAndPos.record(), sourceOutput); + } else if (splitState instanceof LakeSnapshotSplitState) { + ((LakeSnapshotSplitState) splitState).setRecordsToSkip(recordAndPos.readRecordsCount()); + sourceOutputFunc.accept(recordAndPos.record(), sourceOutput); } else { throw new UnsupportedOperationException( "Unknown split state type: " + splitState.getClass()); diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java new file mode 100644 index 000000000..8c148ccf5 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake; + +import com.alibaba.fluss.client.admin.Admin; +import com.alibaba.fluss.client.metadata.LakeSnapshot; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit; +import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit; +import com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer; +import com.alibaba.fluss.flink.source.split.LogSplit; +import com.alibaba.fluss.flink.source.split.SourceSplitBase; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.metadata.PartitionInfo; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableInfo; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.source.FileStoreSourceSplit; +import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.InnerTableScan; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET; +import static com.alibaba.fluss.flink.utils.DataLakeUtils.extractLakeCatalogProperties; +import static com.alibaba.fluss.utils.Preconditions.checkState; + +/** A generator for lake splits. */ +public class LakeSplitGenerator { + + private final TableInfo tableInfo; + private final Admin flussAdmin; + private final OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever; + private final OffsetsInitializer stoppingOffsetInitializer; + private final int bucketCount; + + private final LakeSource<LakeSplit> lakeSource; + + public LakeSplitGenerator( + TableInfo tableInfo, + Admin flussAdmin, + LakeSource<LakeSplit> lakeSource, + OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever, + OffsetsInitializer stoppingOffsetInitializer, + int bucketCount) { + this.tableInfo = tableInfo; + this.flussAdmin = flussAdmin; + this.lakeSource = lakeSource; + this.bucketOffsetsRetriever = bucketOffsetsRetriever; + this.stoppingOffsetInitializer = stoppingOffsetInitializer; + this.bucketCount = bucketCount; + } + + public List<SourceSplitBase> generateHybridLakeSplits() throws Exception { + // get the file store + LakeSnapshot lakeSnapshotInfo = + flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get(); + FileStoreTable fileStoreTable = + getTable( + lakeSnapshotInfo.getSnapshotId(), + extractLakeCatalogProperties(tableInfo.getProperties())); + + boolean isLogTable = !tableInfo.hasPrimaryKey(); + boolean isPartitioned = tableInfo.isPartitioned(); + + Map<String, Map<Integer, List<LakeSplit>>> lakeSplits = + groupLakeSplits( + lakeSource + .createPlanner( + (LakeSource.PlannerContext) lakeSnapshotInfo::getSnapshotId) + .plan()); + if (isPartitioned) { + List<PartitionInfo> partitionInfos = + flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get(); + Map<Long, String> partitionNameById = + partitionInfos.stream() + .collect( + Collectors.toMap( + PartitionInfo::getPartitionId, + PartitionInfo::getPartitionName)); + return generatePartitionTableSplit( + lakeSplits, + isLogTable, + lakeSnapshotInfo.getTableBucketsOffset(), + partitionNameById, + fileStoreTable); + } else { + Map<Integer, List<LakeSplit>> nonPartitionLakeSplits = + lakeSplits.values().iterator().next(); + // non-partitioned table + return generateNoPartitionedTableSplit( + nonPartitionLakeSplits, + isLogTable, + lakeSnapshotInfo.getTableBucketsOffset(), + fileStoreTable); + } + } + + private Map<String, Map<Integer, List<LakeSplit>>> groupLakeSplits(List<LakeSplit> lakeSplits) { + Map<String, Map<Integer, List<LakeSplit>>> result = new HashMap<>(); + for (LakeSplit split : lakeSplits) { + String partition = String.join("$", split.partition()); + int bucket = split.bucket(); + // Get or create the partition group + Map<Integer, List<LakeSplit>> bucketMap = + result.computeIfAbsent(partition, k -> new HashMap<>()); + List<LakeSplit> splitList = bucketMap.computeIfAbsent(bucket, k -> new ArrayList<>()); + splitList.add(split); + } + return result; + } + + private List<SourceSplitBase> generatePartitionTableSplit( + Map<String, Map<Integer, List<LakeSplit>>> lakeSplits, + boolean isLogTable, + Map<TableBucket, Long> tableBucketSnapshotLogOffset, + Map<Long, String> partitionNameById, + @Nullable FileStoreTable fileStoreTable) + throws Exception { + List<SourceSplitBase> splits = new ArrayList<>(); + Map<String, Long> flussPartitionIdByName = + partitionNameById.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getValue, + Map.Entry::getKey, + (existing, replacement) -> existing, + LinkedHashMap::new)); + long lakeSplitPartitionId = -1L; + + // iterate lake splits + for (Map.Entry<String, Map<Integer, List<LakeSplit>>> lakeSplitEntry : + lakeSplits.entrySet()) { + String partitionName = lakeSplitEntry.getKey(); + Map<Integer, List<LakeSplit>> lakeSplitsOfPartition = lakeSplitEntry.getValue(); + Long partitionId = flussPartitionIdByName.remove(partitionName); + if (partitionId != null) { + // mean the partition also exist in fluss partition + Map<Integer, Long> bucketEndOffset = + stoppingOffsetInitializer.getBucketOffsets( + partitionName, + IntStream.range(0, bucketCount) + .boxed() + .collect(Collectors.toList()), + bucketOffsetsRetriever); + splits.addAll( + generateSplit( + lakeSplitsOfPartition, + partitionId, + partitionName, + isLogTable, + tableBucketSnapshotLogOffset, + bucketEndOffset, + fileStoreTable)); + + } else { + // only lake data + splits.addAll( + toLakeSnapshotSplits( + lakeSplitsOfPartition, + partitionName, + // now, we can't get partition id for the partition only + // in lake, set them to a arbitrary partition id, but + // make sure different partition have different partition id + // to enable different partition can be distributed to different + // tasks + lakeSplitPartitionId--)); + } + } + + // iterate remain fluss splits + for (Map.Entry<String, Long> partitionIdByNameEntry : flussPartitionIdByName.entrySet()) { + String partitionName = partitionIdByNameEntry.getKey(); + long partitionId = partitionIdByNameEntry.getValue(); + Map<Integer, Long> bucketEndOffset = + stoppingOffsetInitializer.getBucketOffsets( + partitionName, + IntStream.range(0, bucketCount).boxed().collect(Collectors.toList()), + bucketOffsetsRetriever); + splits.addAll( + generateSplit( + null, + partitionId, + partitionName, + isLogTable, + // pass empty map since we won't read lake splits + Collections.emptyMap(), + bucketEndOffset, + fileStoreTable)); + } + return splits; + } + + private List<SourceSplitBase> generateSplit( + @Nullable Map<Integer, List<LakeSplit>> lakeSplits, + @Nullable Long partitionId, + @Nullable String partitionName, + boolean isLogTable, + Map<TableBucket, Long> tableBucketSnapshotLogOffset, + Map<Integer, Long> bucketEndOffset, + @Nullable FileStoreTable fileStoreTable) { + List<SourceSplitBase> splits = new ArrayList<>(); + if (isLogTable) { + if (lakeSplits != null) { + splits.addAll(toLakeSnapshotSplits(lakeSplits, partitionName, partitionId)); + } + for (int bucket = 0; bucket < bucketCount; bucket++) { + TableBucket tableBucket = + new TableBucket(tableInfo.getTableId(), partitionId, bucket); + Long snapshotLogOffset = tableBucketSnapshotLogOffset.get(tableBucket); + long stoppingOffset = bucketEndOffset.get(bucket); + if (snapshotLogOffset == null) { + // no any data commit to this bucket, scan from fluss log + splits.add( + new LogSplit( + tableBucket, partitionName, EARLIEST_OFFSET, stoppingOffset)); + } else { + // need to read remain fluss log + if (snapshotLogOffset < stoppingOffset) { + splits.add( + new LogSplit( + tableBucket, + partitionName, + snapshotLogOffset, + stoppingOffset)); + } + } + } + } else { + // it's primary key table + for (int bucket = 0; bucket < bucketCount; bucket++) { + TableBucket tableBucket = + new TableBucket(tableInfo.getTableId(), partitionId, bucket); + Long snapshotLogOffset = tableBucketSnapshotLogOffset.get(tableBucket); + long stoppingOffset = bucketEndOffset.get(bucket); + FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator(); + + splits.add( + generateSplitForPrimaryKeyTableBucket( + fileStoreTable, + splitGenerator, + tableBucket, + partitionName, + snapshotLogOffset, + stoppingOffset)); + } + } + + return splits; + } + + private List<SourceSplitBase> toLakeSnapshotSplits( + Map<Integer, List<LakeSplit>> lakeSplits, + @Nullable String partitionName, + @Nullable Long partitionId) { + List<SourceSplitBase> splits = new ArrayList<>(); + for (LakeSplit lakeSplit : + lakeSplits.values().stream().flatMap(List::stream).collect(Collectors.toList())) { + TableBucket tableBucket = + new TableBucket(tableInfo.getTableId(), partitionId, lakeSplit.bucket()); + splits.add(new LakeSnapshotSplit(tableBucket, partitionName, lakeSplit)); + } + return splits; + } + + private SourceSplitBase generateSplitForPrimaryKeyTableBucket( + FileStoreTable fileStoreTable, + FileStoreSourceSplitGenerator splitGenerator, + TableBucket tableBucket, + @Nullable String partitionName, + @Nullable Long snapshotLogOffset, + long stoppingOffset) { + + // no snapshot data for this bucket or no a corresponding log offset in this bucket, + // can only scan from change log + if (snapshotLogOffset == null || snapshotLogOffset < 0) { + return new PaimonSnapshotAndFlussLogSplit( + tableBucket, partitionName, null, EARLIEST_OFFSET, stoppingOffset); + } + + // then, generate a split contains + // snapshot and change log so that we can merge change log and snapshot + // to get the full data + fileStoreTable = + fileStoreTable.copy( + Collections.singletonMap( + CoreOptions.SOURCE_SPLIT_TARGET_SIZE.key(), + // we set a max size to make sure only one splits + MemorySize.MAX_VALUE.toString())); + InnerTableScan tableScan = + fileStoreTable.newScan().withBucketFilter((b) -> b == tableBucket.getBucket()); + + if (partitionName != null) { + tableScan = + tableScan.withPartitionFilter(getPartitionSpec(fileStoreTable, partitionName)); + } + + List<FileStoreSourceSplit> fileStoreSourceSplits = + splitGenerator.createSplits(tableScan.plan()); + + checkState(fileStoreSourceSplits.size() == 1, "Splits for primary key table must be 1."); + FileStoreSourceSplit fileStoreSourceSplit = fileStoreSourceSplits.get(0); + return new PaimonSnapshotAndFlussLogSplit( + tableBucket, + partitionName, + fileStoreSourceSplit, + snapshotLogOffset, + stoppingOffset); + } + + private Map<String, String> getPartitionSpec( + FileStoreTable fileStoreTable, String partitionName) { + List<String> partitionKeys = fileStoreTable.partitionKeys(); + checkState( + partitionKeys.size() == 1, + "Must only one partition key for paimon table %, but got %s, the partition keys are: ", + tableInfo.getTablePath(), + partitionKeys.size(), + partitionKeys); + return Collections.singletonMap(partitionKeys.get(0), partitionName); + } + + private FileStoreTable getTable(long snapshotId, Map<String, String> catalogProperties) + throws Exception { + try (Catalog catalog = + FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(catalogProperties))) { + return (FileStoreTable) + catalog.getTable( + Identifier.create( + tableInfo.getTablePath().getDatabaseName(), + tableInfo.getTablePath().getTableName())) + .copy( + Collections.singletonMap( + CoreOptions.SCAN_SNAPSHOT_ID.key(), + String.valueOf(snapshotId))); + } + } + + private List<SourceSplitBase> generateNoPartitionedTableSplit( + Map<Integer, List<LakeSplit>> lakeSplits, + boolean isLogTable, + Map<TableBucket, Long> tableBucketSnapshotLogOffset, + FileStoreTable fileStoreTable) { + // iterate all bucket + // assume bucket is from 0 to bucket count + Map<Integer, Long> bucketEndOffset = + stoppingOffsetInitializer.getBucketOffsets( + null, + IntStream.range(0, bucketCount).boxed().collect(Collectors.toList()), + bucketOffsetsRetriever); + return generateSplit( + lakeSplits, + null, + null, + isLogTable, + tableBucketSnapshotLogOffset, + bucketEndOffset, + fileStoreTable); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitReaderGenerator.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java similarity index 84% rename from fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitReaderGenerator.java rename to fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java index ffe7b4bd6..02e5e5e5b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitReaderGenerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java @@ -15,10 +15,11 @@ * limitations under the License. */ -package com.alibaba.fluss.flink.lakehouse; +package com.alibaba.fluss.flink.lake; -import com.alibaba.fluss.client.Connection; import com.alibaba.fluss.client.table.Table; +import com.alibaba.fluss.flink.lake.reader.LakeSnapshotScanner; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit; import com.alibaba.fluss.flink.lakehouse.paimon.reader.PaimonSnapshotAndLogSplitScanner; import com.alibaba.fluss.flink.lakehouse.paimon.reader.PaimonSnapshotScanner; import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit; @@ -26,6 +27,8 @@ import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplit; import com.alibaba.fluss.flink.source.reader.BoundedSplitReader; import com.alibaba.fluss.flink.source.split.SourceSplitBase; import com.alibaba.fluss.flink.utils.DataLakeUtils; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; import com.alibaba.fluss.metadata.TablePath; import org.apache.flink.util.ExceptionUtils; @@ -46,21 +49,21 @@ import java.util.stream.IntStream; public class LakeSplitReaderGenerator { private final Table table; - private final Connection connection; private final TablePath tablePath; private FileStoreTable fileStoreTable; private final @Nullable int[] projectedFields; + private final @Nullable LakeSource<LakeSplit> lakeSource; public LakeSplitReaderGenerator( Table table, - Connection connection, TablePath tablePath, - @Nullable int[] projectedFields) { + @Nullable int[] projectedFields, + @Nullable LakeSource<LakeSplit> lakeSource) { this.table = table; - this.connection = connection; this.tablePath = tablePath; this.projectedFields = projectedFields; + this.lakeSource = lakeSource; } public void addSplit(SourceSplitBase split, Queue<SourceSplitBase> boundedSplits) { @@ -68,6 +71,9 @@ public class LakeSplitReaderGenerator { boundedSplits.add(split); } else if (split instanceof PaimonSnapshotAndFlussLogSplit) { boundedSplits.add(split); + } else if (split instanceof LakeSnapshotSplit) { + boundedSplits.add(split); + // TODO support primary key table in https://github.com/apache/fluss/issues/1434 } else { throw new UnsupportedOperationException( String.format("The split type of %s is not supported.", split.getClass())); @@ -100,6 +106,13 @@ public class LakeSplitReaderGenerator { return new BoundedSplitReader( paimonSnapshotAndLogSplitScanner, paimonSnapshotAndFlussLogSplit.getRecordsToSkip()); + } else if (split instanceof LakeSnapshotSplit) { + LakeSnapshotSplit lakeSnapshotSplit = (LakeSnapshotSplit) split; + LakeSnapshotScanner lakeSnapshotScanner = + new LakeSnapshotScanner(lakeSource, lakeSnapshotSplit); + return new BoundedSplitReader( + lakeSnapshotScanner, lakeSnapshotSplit.getRecordsToSplit()); + // TODO support primary key table in https://github.com/apache/fluss/issues/1434 } else { throw new UnsupportedOperationException( String.format("The split type of %s is not supported.", split.getClass())); diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java new file mode 100644 index 000000000..19b6f29c7 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake; + +import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit; +import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit; +import com.alibaba.fluss.flink.source.split.LogSplit; +import com.alibaba.fluss.flink.source.split.SourceSplitBase; +import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.metadata.TableBucket; + +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.paimon.flink.source.FileStoreSourceSplit; +import org.apache.paimon.flink.source.FileStoreSourceSplitSerializer; + +import javax.annotation.Nullable; + +import java.io.IOException; + +import static com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit.LAKE_SNAPSHOT_SPLIT_KIND; +import static com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit.PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND; + +/** A serializer for lake split. */ +public class LakeSplitSerializer { + + private final SimpleVersionedSerializer<LakeSplit> sourceSplitSerializer; + + public LakeSplitSerializer(SimpleVersionedSerializer<LakeSplit> sourceSplitSerializer) { + this.sourceSplitSerializer = sourceSplitSerializer; + } + + public void serialize(DataOutputSerializer out, SourceSplitBase split) throws IOException { + if (split instanceof LakeSnapshotSplit) { + byte[] serializeBytes = + sourceSplitSerializer.serialize(((LakeSnapshotSplit) split).getLakeSplit()); + out.writeInt(serializeBytes.length); + out.write(serializeBytes); + } else if (split instanceof PaimonSnapshotAndFlussLogSplit) { + // TODO support primary key table in https://github.com/apache/fluss/issues/1434 + FileStoreSourceSplitSerializer fileStoreSourceSplitSerializer = + new FileStoreSourceSplitSerializer(); + // writing file store source split + PaimonSnapshotAndFlussLogSplit paimonSnapshotAndFlussLogSplit = + ((PaimonSnapshotAndFlussLogSplit) split); + FileStoreSourceSplit fileStoreSourceSplit = + paimonSnapshotAndFlussLogSplit.getSnapshotSplit(); + if (fileStoreSourceSplit == null) { + // no snapshot data for the bucket + out.writeBoolean(false); + } else { + out.writeBoolean(true); + byte[] serializeBytes = + fileStoreSourceSplitSerializer.serialize(fileStoreSourceSplit); + out.writeInt(serializeBytes.length); + out.write(serializeBytes); + } + // writing starting/stopping offset + out.writeLong(paimonSnapshotAndFlussLogSplit.getStartingOffset()); + out.writeLong( + paimonSnapshotAndFlussLogSplit + .getStoppingOffset() + .orElse(LogSplit.NO_STOPPING_OFFSET)); + out.writeLong(paimonSnapshotAndFlussLogSplit.getRecordsToSkip()); + } else { + throw new UnsupportedOperationException( + "Unsupported split type: " + split.getClass().getName()); + } + } + + public SourceSplitBase deserialize( + byte splitKind, + TableBucket tableBucket, + @Nullable String partition, + DataInputDeserializer input) + throws IOException { + if (splitKind == LAKE_SNAPSHOT_SPLIT_KIND) { + byte[] serializeBytes = new byte[input.readInt()]; + input.read(serializeBytes); + LakeSplit fileStoreSourceSplit = + sourceSplitSerializer.deserialize( + sourceSplitSerializer.getVersion(), serializeBytes); + return new LakeSnapshotSplit(tableBucket, partition, fileStoreSourceSplit); + // TODO support primary key table in https://github.com/apache/fluss/issues/1434 + } else if (splitKind == PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) { + FileStoreSourceSplitSerializer fileStoreSourceSplitSerializer = + new FileStoreSourceSplitSerializer(); + FileStoreSourceSplit fileStoreSourceSplit = null; + if (input.readBoolean()) { + byte[] serializeBytes = new byte[input.readInt()]; + input.read(serializeBytes); + fileStoreSourceSplit = + fileStoreSourceSplitSerializer.deserialize( + fileStoreSourceSplitSerializer.getVersion(), serializeBytes); + } + long startingOffset = input.readLong(); + long stoppingOffset = input.readLong(); + long recordsToSkip = input.readLong(); + return new PaimonSnapshotAndFlussLogSplit( + tableBucket, + partition, + fileStoreSourceSplit, + startingOffset, + stoppingOffset, + recordsToSkip); + } else { + throw new UnsupportedOperationException("Unsupported split kind: " + splitKind); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitStateInitializer.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java similarity index 76% rename from fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitStateInitializer.java rename to fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java index 1fbd20a25..aae8987ad 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitStateInitializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package com.alibaba.fluss.flink.lakehouse; +package com.alibaba.fluss.flink.lake; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit; +import com.alibaba.fluss.flink.lake.state.LakeSnapshotSplitState; import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit; import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplitState; -import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplit; -import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplitState; import com.alibaba.fluss.flink.source.split.SourceSplitBase; import com.alibaba.fluss.flink.source.split.SourceSplitState; @@ -28,10 +28,11 @@ import com.alibaba.fluss.flink.source.split.SourceSplitState; public class LakeSplitStateInitializer { public static SourceSplitState initializedState(SourceSplitBase split) { - if (split instanceof PaimonSnapshotSplit) { - return new PaimonSnapshotSplitState((PaimonSnapshotSplit) split); - } else if (split instanceof PaimonSnapshotAndFlussLogSplit) { + if (split instanceof PaimonSnapshotAndFlussLogSplit) { return new PaimonSnapshotAndFlussLogSplitState((PaimonSnapshotAndFlussLogSplit) split); + } else if (split instanceof LakeSnapshotSplit) { + return new LakeSnapshotSplitState((LakeSnapshotSplit) split); + // TODO support primary key table in https://github.com/apache/fluss/issues/1434 } else { throw new UnsupportedOperationException("Unsupported split type: " + split); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeTableFactory.java similarity index 98% rename from fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeTableFactory.java rename to fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeTableFactory.java index f5ea9a3d3..5092a59e6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeTableFactory.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package com.alibaba.fluss.flink.lakehouse; +package com.alibaba.fluss.flink.lake; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.source.DynamicTableSource; diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotScanner.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotScanner.java new file mode 100644 index 000000000..8a51482ba --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotScanner.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.reader; + +import com.alibaba.fluss.client.table.scanner.batch.BatchScanner; +import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; + +/** A scanner for reading lake split {@link LakeSnapshotSplit}. */ +public class LakeSnapshotScanner implements BatchScanner { + + private final LakeSource<LakeSplit> lakeSource; + private final LakeSnapshotSplit lakeSnapshotSplit; + + private CloseableIterator<InternalRow> rowsIterator; + + public LakeSnapshotScanner( + LakeSource<LakeSplit> lakeSource, LakeSnapshotSplit lakeSnapshotSplit) { + this.lakeSource = lakeSource; + this.lakeSnapshotSplit = lakeSnapshotSplit; + } + + @Nullable + @Override + public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException { + if (rowsIterator == null) { + rowsIterator = + InternalRowIterator.wrap( + lakeSource + .createRecordReader( + (LakeSource.ReaderContext<LakeSplit>) + lakeSnapshotSplit::getLakeSplit) + .read()); + } + return rowsIterator.hasNext() ? rowsIterator : null; + } + + @Override + public void close() throws IOException { + if (rowsIterator != null) { + rowsIterator.close(); + } + } + + private static class InternalRowIterator implements CloseableIterator<InternalRow> { + + private final CloseableIterator<LogRecord> recordCloseableIterator; + + private static InternalRowIterator wrap( + CloseableIterator<LogRecord> recordCloseableIterator) { + return new InternalRowIterator(recordCloseableIterator); + } + + private InternalRowIterator(CloseableIterator<LogRecord> recordCloseableIterator) { + this.recordCloseableIterator = recordCloseableIterator; + } + + @Override + public void close() { + recordCloseableIterator.close(); + } + + @Override + public boolean hasNext() { + return recordCloseableIterator.hasNext(); + } + + @Override + public InternalRow next() { + return recordCloseableIterator.next().getRow(); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/split/LakeSnapshotSplit.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/split/LakeSnapshotSplit.java new file mode 100644 index 000000000..b2460177a --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/split/LakeSnapshotSplit.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.split; + +import com.alibaba.fluss.flink.source.split.SourceSplitBase; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.metadata.TableBucket; + +import javax.annotation.Nullable; + +/** A split for reading a snapshot of lake. */ +public class LakeSnapshotSplit extends SourceSplitBase { + + public static final byte LAKE_SNAPSHOT_SPLIT_KIND = -1; + + private final LakeSplit lakeSplit; + + private final long recordsToSplit; + + public LakeSnapshotSplit( + TableBucket tableBucket, @Nullable String partitionName, LakeSplit lakeSplit) { + this(tableBucket, partitionName, lakeSplit, 0); + } + + public LakeSnapshotSplit( + TableBucket tableBucket, + @Nullable String partitionName, + LakeSplit lakeSplit, + long recordsToSplit) { + super(tableBucket, partitionName); + this.lakeSplit = lakeSplit; + this.recordsToSplit = recordsToSplit; + } + + public LakeSplit getLakeSplit() { + return lakeSplit; + } + + public long getRecordsToSplit() { + return recordsToSplit; + } + + @Override + public String splitId() { + return toSplitId( + "lake-snapshot-", + new TableBucket( + tableBucket.getTableId(), + tableBucket.getPartitionId(), + lakeSplit.bucket())); + } + + @Override + public boolean isLakeSplit() { + return true; + } + + @Override + public byte splitKind() { + return LAKE_SNAPSHOT_SPLIT_KIND; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotSplitState.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotSplitState.java new file mode 100644 index 000000000..ae6eb3fa4 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotSplitState.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.lake.state; + +import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit; +import com.alibaba.fluss.flink.source.split.SourceSplitBase; +import com.alibaba.fluss.flink.source.split.SourceSplitState; + +/** The state of {@link LakeSnapshotSplit}. */ +public class LakeSnapshotSplitState extends SourceSplitState { + + private final LakeSnapshotSplit split; + private long recordsToSplit; + + public LakeSnapshotSplitState(LakeSnapshotSplit split) { + super(split); + this.split = split; + this.recordsToSplit = split.getRecordsToSplit(); + } + + public void setRecordsToSkip(long recordsToSkip) { + this.recordsToSplit = recordsToSkip; + } + + @Override + public SourceSplitBase toSourceSplit() { + return new LakeSnapshotSplit( + split.getTableBucket(), + split.getPartitionName(), + split.getLakeSplit(), + recordsToSplit); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java index d806164e0..1b4f750b0 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java @@ -31,6 +31,8 @@ import com.alibaba.fluss.flink.source.split.SourceSplitSerializer; import com.alibaba.fluss.flink.source.state.FlussSourceEnumeratorStateSerializer; import com.alibaba.fluss.flink.source.state.SourceEnumeratorState; import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.types.RowType; @@ -70,6 +72,8 @@ public class FlinkSource<OUT> private final List<FieldEqual> partitionFilters; + private final @Nullable LakeSource<LakeSplit> lakeSource; + public FlinkSource( Configuration flussConf, TablePath tablePath, @@ -82,6 +86,34 @@ public class FlinkSource<OUT> FlussDeserializationSchema<OUT> deserializationSchema, boolean streaming, List<FieldEqual> partitionFilters) { + this( + flussConf, + tablePath, + hasPrimaryKey, + isPartitioned, + sourceOutputType, + projectedFields, + offsetsInitializer, + scanPartitionDiscoveryIntervalMs, + deserializationSchema, + streaming, + partitionFilters, + null); + } + + public FlinkSource( + Configuration flussConf, + TablePath tablePath, + boolean hasPrimaryKey, + boolean isPartitioned, + RowType sourceOutputType, + @Nullable int[] projectedFields, + OffsetsInitializer offsetsInitializer, + long scanPartitionDiscoveryIntervalMs, + FlussDeserializationSchema<OUT> deserializationSchema, + boolean streaming, + List<FieldEqual> partitionFilters, + LakeSource<LakeSplit> lakeSource) { this.flussConf = flussConf; this.tablePath = tablePath; this.hasPrimaryKey = hasPrimaryKey; @@ -93,6 +125,7 @@ public class FlinkSource<OUT> this.deserializationSchema = deserializationSchema; this.streaming = streaming; this.partitionFilters = checkNotNull(partitionFilters); + this.lakeSource = lakeSource; } @Override @@ -112,7 +145,8 @@ public class FlinkSource<OUT> offsetsInitializer, scanPartitionDiscoveryIntervalMs, streaming, - partitionFilters); + partitionFilters, + lakeSource); } @Override @@ -130,12 +164,13 @@ public class FlinkSource<OUT> offsetsInitializer, scanPartitionDiscoveryIntervalMs, streaming, - partitionFilters); + partitionFilters, + lakeSource); } @Override public SimpleVersionedSerializer<SourceSplitBase> getSplitSerializer() { - return SourceSplitSerializer.INSTANCE; + return new SourceSplitSerializer(lakeSource); } @Override @@ -166,7 +201,8 @@ public class FlinkSource<OUT> context, projectedFields, flinkSourceReaderMetrics, - recordEmitter); + recordEmitter, + lakeSource); } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java index 65e8ac813..deb986a50 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java @@ -28,6 +28,8 @@ import com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils; import com.alibaba.fluss.flink.utils.FlinkConversions; import com.alibaba.fluss.flink.utils.PushdownUtils; import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; import com.alibaba.fluss.metadata.MergeEngineType; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.types.RowType; @@ -76,6 +78,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import static com.alibaba.fluss.flink.utils.LakeSourceUtils.createLakeSource; import static com.alibaba.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE; import static com.alibaba.fluss.flink.utils.PushdownUtils.extractFieldEquals; import static com.alibaba.fluss.utils.Preconditions.checkNotNull; @@ -130,6 +133,10 @@ public class FlinkTableSource private List<FieldEqual> partitionFilters = Collections.emptyList(); + private final Map<String, String> tableOptions; + + @Nullable private LakeSource<LakeSplit> lakeSource; + public FlinkTableSource( TablePath tablePath, Configuration flussConfig, @@ -144,7 +151,8 @@ public class FlinkTableSource @Nullable LookupCache cache, long scanPartitionDiscoveryIntervalMs, boolean isDataLakeEnabled, - @Nullable MergeEngineType mergeEngineType) { + @Nullable MergeEngineType mergeEngineType, + Map<String, String> tableOptions) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableOutputType = tableOutputType; @@ -162,6 +170,10 @@ public class FlinkTableSource this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; this.isDataLakeEnabled = isDataLakeEnabled; this.mergeEngineType = mergeEngineType; + this.tableOptions = tableOptions; + if (isDataLakeEnabled) { + this.lakeSource = createLakeSource(tablePath, tableOptions); + } } @Override @@ -270,7 +282,8 @@ public class FlinkTableSource scanPartitionDiscoveryIntervalMs, new RowDataDeserializationSchema(), streaming, - partitionFilters); + partitionFilters, + lakeSource); if (!streaming) { // return a bounded source provide to make planner happy, @@ -359,12 +372,14 @@ public class FlinkTableSource cache, scanPartitionDiscoveryIntervalMs, isDataLakeEnabled, - mergeEngineType); + mergeEngineType, + tableOptions); source.producedDataType = producedDataType; source.projectedFields = projectedFields; source.singleRowFilter = singleRowFilter; source.modificationScanType = modificationScanType; source.partitionFilters = partitionFilters; + source.lakeSource = lakeSource; return source; } @@ -382,10 +397,17 @@ public class FlinkTableSource public void applyProjection(int[][] projectedFields, DataType producedDataType) { this.projectedFields = Arrays.stream(projectedFields).mapToInt(value -> value[0]).toArray(); this.producedDataType = producedDataType.getLogicalType(); + if (lakeSource != null) { + lakeSource.withProject(projectedFields); + } } @Override public Result applyFilters(List<ResolvedExpression> filters) { + if (lakeSource != null) { + // todo: use real filters + } + List<ResolvedExpression> acceptedFilters = new ArrayList<>(); List<ResolvedExpression> remainingFilters = new ArrayList<>(); diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/emitter/FlinkRecordEmitter.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/emitter/FlinkRecordEmitter.java index 3e62fdd0e..84602ca35 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/emitter/FlinkRecordEmitter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/emitter/FlinkRecordEmitter.java @@ -18,7 +18,7 @@ package com.alibaba.fluss.flink.source.emitter; import com.alibaba.fluss.client.table.scanner.ScanRecord; -import com.alibaba.fluss.flink.lakehouse.LakeRecordRecordEmitter; +import com.alibaba.fluss.flink.lake.LakeRecordRecordEmitter; import com.alibaba.fluss.flink.source.deserializer.FlussDeserializationSchema; import com.alibaba.fluss.flink.source.reader.FlinkSourceReader; import com.alibaba.fluss.flink.source.reader.RecordAndPos; diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index e65e64464..05ac7fa6e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -23,7 +23,7 @@ import com.alibaba.fluss.client.admin.Admin; import com.alibaba.fluss.client.metadata.KvSnapshots; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; -import com.alibaba.fluss.flink.lakehouse.LakeSplitGenerator; +import com.alibaba.fluss.flink.lake.LakeSplitGenerator; import com.alibaba.fluss.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl; import com.alibaba.fluss.flink.source.enumerator.initializer.NoStoppingOffsetsInitializer; import com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer; @@ -36,6 +36,8 @@ import com.alibaba.fluss.flink.source.split.LogSplit; import com.alibaba.fluss.flink.source.split.SourceSplitBase; import com.alibaba.fluss.flink.source.state.SourceEnumeratorState; import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; import com.alibaba.fluss.metadata.PartitionInfo; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableInfo; @@ -130,6 +132,8 @@ public class FlinkSourceEnumerator private final List<FieldEqual> partitionFilters; + @Nullable private final LakeSource<LakeSplit> lakeSource; + public FlinkSourceEnumerator( TablePath tablePath, Configuration flussConf, @@ -140,6 +144,30 @@ public class FlinkSourceEnumerator long scanPartitionDiscoveryIntervalMs, boolean streaming, List<FieldEqual> partitionFilters) { + this( + tablePath, + flussConf, + hasPrimaryKey, + isPartitioned, + context, + startingOffsetsInitializer, + scanPartitionDiscoveryIntervalMs, + streaming, + partitionFilters, + null); + } + + public FlinkSourceEnumerator( + TablePath tablePath, + Configuration flussConf, + boolean hasPrimaryKey, + boolean isPartitioned, + SplitEnumeratorContext<SourceSplitBase> context, + OffsetsInitializer startingOffsetsInitializer, + long scanPartitionDiscoveryIntervalMs, + boolean streaming, + List<FieldEqual> partitionFilters, + LakeSource<LakeSplit> lakeSource) { this( tablePath, flussConf, @@ -151,7 +179,8 @@ public class FlinkSourceEnumerator startingOffsetsInitializer, scanPartitionDiscoveryIntervalMs, streaming, - partitionFilters); + partitionFilters, + lakeSource); } public FlinkSourceEnumerator( @@ -165,7 +194,8 @@ public class FlinkSourceEnumerator OffsetsInitializer startingOffsetsInitializer, long scanPartitionDiscoveryIntervalMs, boolean streaming, - List<FieldEqual> partitionFilters) { + List<FieldEqual> partitionFilters, + @Nullable LakeSource<LakeSplit> lakeSource) { this.tablePath = checkNotNull(tablePath); this.flussConf = checkNotNull(flussConf); this.hasPrimaryKey = hasPrimaryKey; @@ -180,6 +210,7 @@ public class FlinkSourceEnumerator this.partitionFilters = checkNotNull(partitionFilters); this.stoppingOffsetsInitializer = streaming ? new NoStoppingOffsetsInitializer() : OffsetsInitializer.latest(); + this.lakeSource = lakeSource; } @Override @@ -485,10 +516,12 @@ public class FlinkSourceEnumerator new LakeSplitGenerator( tableInfo, flussAdmin, + lakeSource, bucketOffsetsRetriever, stoppingOffsetsInitializer, tableInfo.getNumBuckets()); - return lakeSplitGenerator.generateLakeSplits(); + List<SourceSplitBase> lakeSplits = lakeSplitGenerator.generateHybridLakeSplits(); + return lakeSplits; } private boolean ignoreTableBucket(TableBucket tableBucket) { diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java index ba7f6642a..d2db9ac8c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java @@ -18,7 +18,7 @@ package com.alibaba.fluss.flink.source.reader; import com.alibaba.fluss.config.Configuration; -import com.alibaba.fluss.flink.lakehouse.LakeSplitStateInitializer; +import com.alibaba.fluss.flink.lake.LakeSplitStateInitializer; import com.alibaba.fluss.flink.source.emitter.FlinkRecordEmitter; import com.alibaba.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent; import com.alibaba.fluss.flink.source.event.PartitionsRemovedEvent; @@ -28,6 +28,8 @@ import com.alibaba.fluss.flink.source.split.HybridSnapshotLogSplitState; import com.alibaba.fluss.flink.source.split.LogSplitState; import com.alibaba.fluss.flink.source.split.SourceSplitBase; import com.alibaba.fluss.flink.source.split.SourceSplitState; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.types.RowType; @@ -57,7 +59,8 @@ public class FlinkSourceReader<OUT> SourceReaderContext context, @Nullable int[] projectedFields, FlinkSourceReaderMetrics flinkSourceReaderMetrics, - FlinkRecordEmitter<OUT> recordEmitter) { + FlinkRecordEmitter<OUT> recordEmitter, + LakeSource<LakeSplit> lakeSource) { super( elementsQueue, new FlinkSourceFetcherManager( @@ -68,7 +71,8 @@ public class FlinkSourceReader<OUT> tablePath, sourceOutputType, projectedFields, - flinkSourceReaderMetrics), + flinkSourceReaderMetrics, + lakeSource), (ignore) -> {}), recordEmitter, context.getConfiguration(), diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java index 55c57c79c..ac18b5283 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java @@ -26,13 +26,15 @@ import com.alibaba.fluss.client.table.scanner.log.LogScanner; import com.alibaba.fluss.client.table.scanner.log.ScanRecords; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.PartitionNotExistException; -import com.alibaba.fluss.flink.lakehouse.LakeSplitReaderGenerator; +import com.alibaba.fluss.flink.lake.LakeSplitReaderGenerator; import com.alibaba.fluss.flink.metrics.FlinkMetricRegistry; import com.alibaba.fluss.flink.source.metrics.FlinkSourceReaderMetrics; import com.alibaba.fluss.flink.source.split.HybridSnapshotLogSplit; import com.alibaba.fluss.flink.source.split.LogSplit; import com.alibaba.fluss.flink.source.split.SnapshotSplit; import com.alibaba.fluss.flink.source.split.SourceSplitBase; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.types.RowType; @@ -99,6 +101,8 @@ public class FlinkSourceSplitReader implements SplitReader<RecordAndPos, SourceS private final Table table; private final FlinkMetricRegistry flinkMetricRegistry; + @Nullable private LakeSource<LakeSplit> lakeSource; + // table id, will be null when haven't received any split private Long tableId; @@ -116,7 +120,8 @@ public class FlinkSourceSplitReader implements SplitReader<RecordAndPos, SourceS TablePath tablePath, RowType sourceOutputType, @Nullable int[] projectedFields, - FlinkSourceReaderMetrics flinkSourceReaderMetrics) { + FlinkSourceReaderMetrics flinkSourceReaderMetrics, + @Nullable LakeSource<LakeSplit> lakeSource) { this.flinkMetricRegistry = new FlinkMetricRegistry(flinkSourceReaderMetrics.getSourceReaderMetricGroup()); this.connection = ConnectionFactory.createConnection(flussConf, flinkMetricRegistry); @@ -131,6 +136,7 @@ public class FlinkSourceSplitReader implements SplitReader<RecordAndPos, SourceS this.logScanner = table.newScan().project(projectedFields).createLogScanner(); this.stoppingOffsets = new HashMap<>(); this.emptyLogSplits = new HashSet<>(); + this.lakeSource = lakeSource; } @Override @@ -216,7 +222,8 @@ public class FlinkSourceSplitReader implements SplitReader<RecordAndPos, SourceS private LakeSplitReaderGenerator getLakeSplitReader() { if (lakeSplitReaderGenerator == null) { lakeSplitReaderGenerator = - new LakeSplitReaderGenerator(table, connection, tablePath, projectedFields); + new LakeSplitReaderGenerator( + table, tablePath, projectedFields, checkNotNull(lakeSource)); } return lakeSplitReaderGenerator; } diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializer.java index dda093086..cdbbd3216 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializer.java @@ -17,20 +17,24 @@ package com.alibaba.fluss.flink.source.split; -import com.alibaba.fluss.flink.lakehouse.LakeSplitSerializer; +import com.alibaba.fluss.flink.lake.LakeSplitSerializer; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; import com.alibaba.fluss.metadata.TableBucket; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; +import javax.annotation.Nullable; + import java.io.IOException; +import static com.alibaba.fluss.utils.Preconditions.checkNotNull; + /** A serializer for the {@link SourceSplitBase}. */ public class SourceSplitSerializer implements SimpleVersionedSerializer<SourceSplitBase> { - public static final SourceSplitSerializer INSTANCE = new SourceSplitSerializer(); - private static final int VERSION_0 = 0; private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE = @@ -41,7 +45,11 @@ public class SourceSplitSerializer implements SimpleVersionedSerializer<SourceSp private static final int CURRENT_VERSION = VERSION_0; - private LakeSplitSerializer lakeSplitSerializer; + @Nullable private final LakeSource<LakeSplit> lakeSource; + + public SourceSplitSerializer(LakeSource<LakeSplit> lakeSource) { + this.lakeSource = lakeSource; + } @Override public int getVersion() { @@ -75,7 +83,9 @@ public class SourceSplitSerializer implements SimpleVersionedSerializer<SourceSp out.writeLong(logSplit.getStoppingOffset().orElse(LogSplit.NO_STOPPING_OFFSET)); } } else { - getLakeSplitSerializer().serialize(out, split); + LakeSplitSerializer lakeSplitSerializer = + new LakeSplitSerializer(checkNotNull(lakeSource).getSplitSerializer()); + lakeSplitSerializer.serialize(out, split); } final byte[] result = out.getCopyOfBuffer(); @@ -135,14 +145,9 @@ public class SourceSplitSerializer implements SimpleVersionedSerializer<SourceSp long stoppingOffset = in.readLong(); return new LogSplit(tableBucket, partitionName, startingOffset, stoppingOffset); } else { - return getLakeSplitSerializer().deserialize(splitKind, tableBucket, partitionName, in); - } - } - - private LakeSplitSerializer getLakeSplitSerializer() { - if (lakeSplitSerializer == null) { - lakeSplitSerializer = new LakeSplitSerializer(); + LakeSplitSerializer lakeSplitSerializer = + new LakeSplitSerializer(checkNotNull(lakeSource).getSplitSerializer()); + return lakeSplitSerializer.deserialize(splitKind, tableBucket, partitionName, in); } - return lakeSplitSerializer; } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java index da65bd646..1c95cf87e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java @@ -24,6 +24,7 @@ import com.alibaba.fluss.config.MemorySize; import com.alibaba.fluss.config.Password; import com.alibaba.fluss.flink.FlinkConnectorOptions; import com.alibaba.fluss.flink.catalog.FlinkCatalogFactory; +import com.alibaba.fluss.metadata.DataLakeFormat; import com.alibaba.fluss.metadata.DatabaseDescriptor; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableDescriptor; @@ -51,6 +52,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import static com.alibaba.fluss.flink.FlinkConnectorOptions.BUCKET_KEY; @@ -94,6 +96,20 @@ public class FlinkConversions { // put fluss table properties into flink options, to make the properties visible to users convertFlussTablePropertiesToFlinkOptions(tableInfo.getProperties().toMap(), newOptions); + // put lake related options to table options + Optional<DataLakeFormat> optDataLakeFormat = tableInfo.getTableConfig().getDataLakeFormat(); + if (optDataLakeFormat.isPresent()) { + DataLakeFormat dataLakeFormat = optDataLakeFormat.get(); + String dataLakePrefix = "table.datalake." + dataLakeFormat + "."; + + for (Map.Entry<String, String> tableProperty : + tableInfo.getProperties().toMap().entrySet()) { + if (tableProperty.getKey().startsWith(dataLakePrefix)) { + newOptions.put(tableProperty.getKey(), tableProperty.getValue()); + } + } + } + org.apache.flink.table.api.Schema.Builder schemaBuilder = org.apache.flink.table.api.Schema.newBuilder(); if (tableInfo.hasPrimaryKey()) { diff --git a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/LakeSourceUtils.java b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/LakeSourceUtils.java new file mode 100644 index 000000000..6b849e243 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/LakeSourceUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.flink.utils; + +import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.lake.lakestorage.LakeStorage; +import com.alibaba.fluss.lake.lakestorage.LakeStoragePlugin; +import com.alibaba.fluss.lake.lakestorage.LakeStoragePluginSetUp; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.LakeSplit; +import com.alibaba.fluss.metadata.TablePath; + +import java.util.Map; + +import static com.alibaba.fluss.utils.Preconditions.checkNotNull; + +/** Utils for create lake source. */ +public class LakeSourceUtils { + + @SuppressWarnings("unchecked") + public static LakeSource<LakeSplit> createLakeSource( + TablePath tablePath, Map<String, String> properties) { + Map<String, String> catalogProperties = + DataLakeUtils.extractLakeCatalogProperties(Configuration.fromMap(properties)); + Configuration lakeConfig = Configuration.fromMap(catalogProperties); + + String dataLake = + Configuration.fromMap(properties) + .get(ConfigOptions.TABLE_DATALAKE_FORMAT) + .toString(); + LakeStoragePlugin lakeStoragePlugin = + LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null); + LakeStorage lakeStorage = checkNotNull(lakeStoragePlugin).createLakeStorage(lakeConfig); + return (LakeSource<LakeSplit>) lakeStorage.createLakeSource(tablePath); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index e518f7b05..a7594a0b5 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -356,7 +356,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase { OffsetsInitializer.earliest(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming, - Collections.emptyList()); + Collections.emptyList(), + null); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); diff --git a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReaderTest.java index d29749853..4ce072631 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReaderTest.java @@ -180,6 +180,7 @@ class FlinkSourceReaderTest extends FlinkTestBase { context, null, new FlinkSourceReaderMetrics(context.metricGroup()), - recordEmitter); + recordEmitter, + null); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java index 29ead7607..4984b7d7b 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java @@ -89,7 +89,8 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase { DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("age", DataTypes.INT())), null, - createMockSourceReaderMetrics())) + createMockSourceReaderMetrics(), + null)) .isInstanceOf(ValidationException.class) .hasMessage( "The Flink query schema is not matched to Fluss table schema. \n" @@ -106,7 +107,8 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase { "id", DataTypes.BIGINT().copy(false)), DataTypes.FIELD("name", DataTypes.STRING())), new int[] {1, 0}, - createMockSourceReaderMetrics())) + createMockSourceReaderMetrics(), + null)) .isInstanceOf(ValidationException.class) .hasMessage( "The Flink query schema is not matched to Fluss table schema. \n" @@ -394,7 +396,7 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase { private FlinkSourceSplitReader createSplitReader(TablePath tablePath, RowType rowType) { return new FlinkSourceSplitReader( - clientConf, tablePath, rowType, null, createMockSourceReaderMetrics()); + clientConf, tablePath, rowType, null, createMockSourceReaderMetrics(), null); } private FlinkSourceReaderMetrics createMockSourceReaderMetrics() { diff --git a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializerTest.java index da6e7830b..7491eb568 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializerTest.java @@ -31,7 +31,7 @@ import static org.assertj.core.api.Assertions.assertThat; */ class SourceSplitSerializerTest { - private static final SourceSplitSerializer serializer = SourceSplitSerializer.INSTANCE; + private static final SourceSplitSerializer serializer = new SourceSplitSerializer(null); private static final TableBucket tableBucket = new TableBucket(1, 2); private static final TableBucket partitionedTableBucket = new TableBucket(1, 100L, 2); diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index d4015908b..aa565fbaa 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -319,6 +319,7 @@ <exclude>com.alibaba.fluss.metrics.*</exclude> <!-- end exclude for metric --> <exclude>com.alibaba.fluss.flink.lakehouse.*</exclude> + <exclude>com.alibaba.fluss.flink.lake.*</exclude> <exclude>com.alibaba.fluss.kafka.*</exclude> <!-- exclude for fluss-ci-tools --> <exclude>com.alibaba.fluss.tools.ci.*</exclude>