xushiyan commented on code in PR #17773: URL: https://github.com/apache/hudi/pull/17773#discussion_r2663121456
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.reader; + +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hudi.source.split.HoodieSourceSplit; +import org.apache.hudi.source.split.SerializableComparator; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Queue; + +/** + * The split reader of Hoodie source. + * + * @param <T> record type + */ +public class HoodieSourceSplitReader<T> implements SplitReader<HoodieRecordWithPosition<T>, HoodieSourceSplit> { + private static final Logger LOG = LoggerFactory.getLogger(HoodieSourceSplitReader.class); + + private final SerializableComparator<HoodieSourceSplit> splitComparator; + private final SplitReaderFunction<T> readerFunction; + private final int indexOfSubTask; + private final Queue<HoodieSourceSplit> splits; + + private HoodieSourceSplit currentSplit; + private String currentSplitId; + + public HoodieSourceSplitReader( + SourceReaderContext context, + SplitReaderFunction<T> readerFunction, + SerializableComparator<HoodieSourceSplit> splitComparator) { + this.splitComparator = splitComparator; + this.readerFunction = readerFunction; + this.indexOfSubTask = context.getIndexOfSubtask(); + this.splits = new ArrayDeque<>(); + } + + @Override + public RecordsWithSplitIds<HoodieRecordWithPosition<T>> fetch() throws IOException { + HoodieSourceSplit nextSplit = splits.poll(); + if (nextSplit != null) { + currentSplit = nextSplit; + currentSplitId = nextSplit.splitId(); + return readerFunction.read(currentSplit); + } else { + // return an empty result, which will lead to split fetch to be idle. + // SplitFetcherManager will then close idle fetcher. + return new RecordsBySplits<>(Collections.emptyMap(), Collections.emptySet()); + } + } + + @Override + public void handleSplitsChanges(SplitsChange<HoodieSourceSplit> splitsChange) { + if (!(splitsChange instanceof SplitsAddition)) { + throw new UnsupportedOperationException( + String.format("Unsupported split change: %s", splitsChange.getClass())); + } + + if (splitComparator != null) { + List<HoodieSourceSplit> newSplits = new ArrayList<>(splitsChange.splits()); + newSplits.sort(splitComparator); + LOG.info("Add {} splits to reader: {}", newSplits.size(), newSplits); + splits.addAll(newSplits); + } else { + LOG.info("Add {} splits to reader", splitsChange.splits().size()); + splits.addAll(splitsChange.splits()); + } + } + + @Override + public void wakeUp() { + // Nothing to do + } + + @Override + public void close() throws Exception { + currentSplitId = null; + } + + @Override + public void pauseOrResumeSplits( + Collection<HoodieSourceSplit> splitsToPause, + Collection<HoodieSourceSplit> splitsToResume) { + // SourceSplitReader only reads splits sequentially. When waiting for watermark alignment + // the SourceOperator will stop processing and recycling the fetched batches. This exhausts the + // {@link ArrayPoolDataIteratorBatcher#pool} and the `currentReader.next()` call will be + // blocked even without split-level watermark alignment. Based on this the + // `pauseOrResumeSplits` and the `wakeUp` are left empty. Review Comment: move this multi-line comment to the method doc and add `// no op` here ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputSplit.java: ########## @@ -43,7 +43,7 @@ public CdcInputSplit( long maxCompactionMemoryInBytes, String fileId, HoodieCDCFileSplit[] changes) { - super(splitNum, null, Option.empty(), "", tablePath, + super(splitNum, null, Option.empty(), "", tablePath, "", Review Comment: is there constant for empty str partition path ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java: ########## @@ -927,13 +927,13 @@ public static MergeOnReadInputSplit fileSlice2Split( .collect(Collectors.toList())); String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); return new MergeOnReadInputSplit(0, basePath, logPaths, - fileSlice.getLatestInstantTime(), tablePath, maxCompactionMemoryInBytes, + fileSlice.getLatestInstantTime(), tablePath, fileSlice.getPartitionPath(), maxCompactionMemoryInBytes, FlinkOptions.REALTIME_PAYLOAD_COMBINE, null, fileSlice.getFileId()); } public static MergeOnReadInputSplit singleLogFile2Split(String tablePath, String filePath, long maxCompactionMemoryInBytes) { return new MergeOnReadInputSplit(0, null, Option.of(Collections.singletonList(filePath)), - FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(filePath)), tablePath, maxCompactionMemoryInBytes, + FSUtils.getDeltaCommitTimeFromLogPath(new StoragePath(filePath)), tablePath, "", maxCompactionMemoryInBytes, Review Comment: using the empty partition path constant makes this more readable ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java: ########## @@ -47,8 +48,12 @@ public class HoodieSourceSplit implements SourceSplit, Serializable { private final Option<List<String>> logPaths; // the base table path private final String tablePath; + // partition path + private final String partitionPath; // source merge type private final String mergeType; + // the latest commit instant time + private final String latestCommit; Review Comment: calling it the "latest" does not seem appropriate for the corresponding commit, as there will always be new commits from time to time. this property is more accurately to be `as_of_instant_time` ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/MergeOnReadSplitReaderFunction.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.reader.function; + +import org.apache.hudi.common.config.HoodieReaderConfig; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.read.HoodieFileGroupReader; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.source.reader.BatchRecords; +import org.apache.hudi.source.reader.HoodieRecordWithPosition; +import org.apache.hudi.source.reader.SplitReaderFunction; +import org.apache.hudi.source.split.HoodieSourceSplit; +import org.apache.hudi.table.HoodieTable; + +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.table.data.RowData; + +import java.io.IOException; +import java.util.Collections; +import java.util.stream.Collectors; + +/** + * Reader function implementation for Merge On Read table. + */ +public class MergeOnReadSplitReaderFunction<I, K, O> implements SplitReaderFunction<RowData> { + private final HoodieTable<RowData, I, K, O> hoodieTable; + private final HoodieReaderContext<RowData> readerContext; + private final HoodieSchema tableSchema; + private final HoodieSchema requiredSchema; + private final String mergeType; + private final Option<InternalSchema> internalSchemaOption; + private final TypedProperties props; + + public MergeOnReadSplitReaderFunction(HoodieTable<RowData, I, K, O> hoodieTable, HoodieReaderContext<RowData> readerContext) { + this(hoodieTable, readerContext, null, null, FlinkOptions.REALTIME_PAYLOAD_COMBINE, Option.empty()); + } + + public MergeOnReadSplitReaderFunction( + HoodieTable<RowData, I, K, O> hoodieTable, + HoodieReaderContext<RowData> readerContext, + HoodieSchema tableSchema, + HoodieSchema requiredSchema, + String mergeType, + Option<InternalSchema> internalSchemaOption) { + this.hoodieTable = hoodieTable; + this.readerContext = readerContext; + this.tableSchema = tableSchema; + this.requiredSchema = requiredSchema; + this.mergeType = mergeType; + this.internalSchemaOption = internalSchemaOption; + this.props = new TypedProperties(); + this.props.put(HoodieReaderConfig.MERGE_TYPE.key(), mergeType); + } + + @Override + public RecordsWithSplitIds<HoodieRecordWithPosition<RowData>> read(HoodieSourceSplit split) { + final String splitId = split.splitId(); + try (HoodieFileGroupReader<RowData> fileGroupReader = createFileGroupReader(split)) { + final ClosableIterator<RowData> recordIterator = fileGroupReader.getClosableIterator(); + BatchRecords<RowData> records = BatchRecords.forRecords(splitId, recordIterator, split.getFileOffset(), split.getRecordOffset()); + records.seek(split.getRecordOffset()); + return records; Review Comment: fileGroupReader will be closed before records being consumed by the caller. fileGroupReader lifecycle should be managed at a higher level ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/SplitReaderFunction.java: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.reader; Review Comment: why not under the function subpackage? ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/BatchRecords.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.reader; + +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; + +import java.util.Collections; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; + +/** + * Implementation of RecordsWithSplitIds with a list record inside. + * + * Type parameters: <T> – record type + */ +public class BatchRecords<T> implements RecordsWithSplitIds<HoodieRecordWithPosition<T>> { + private String splitId; + private final ClosableIterator<T> recordIterator; + private final Set<String> finishedSplits; + private final HoodieRecordWithPosition<T> recordAndPosition; + + // point to current read position within the records list + private int position; + + BatchRecords( + String splitId, + ClosableIterator<T> recordIterator, + int fileOffset, + long startingRecordOffset, + Set<String> finishedSplits) { + ValidationUtils.checkArgument( + finishedSplits != null, "finishedSplits can be empty but not null"); + ValidationUtils.checkArgument( + recordIterator != null, "recordIterator can be empty but not null"); + + this.splitId = splitId; + this.recordIterator = recordIterator; + this.finishedSplits = finishedSplits; + this.recordAndPosition = new HoodieRecordWithPosition<>(); + this.recordAndPosition.set(null, fileOffset, startingRecordOffset); + this.position = 0; + } + + @Nullable + @Override + public String nextSplit() { + String nextSplit = this.splitId; + // set the splitId to null to indicate no more splits + // this class only contains record for one split + this.splitId = null; + return nextSplit; + } + + @Nullable + @Override + public HoodieRecordWithPosition<T> nextRecordFromSplit() { + if (recordIterator.hasNext()) { + recordAndPosition.record(recordIterator.next()); + position++; + return recordAndPosition; + } else { + return null; + } + } + + @Override + public Set<String> finishedSplits() { + return finishedSplits; Review Comment: when will finishedSplits be populated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
