xushiyan commented on code in PR #17562: URL: https://github.com/apache/hudi/pull/17562#discussion_r2629258823
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.enumerator; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.source.ScanContext; +import org.apache.hudi.source.split.HoodieContinuousSplitBatch; +import org.apache.hudi.source.split.HoodieContinuousSplitDiscover; +import org.apache.hudi.source.split.HoodieSourceSplit; +import org.apache.hudi.source.split.HoodieSplitProvider; + +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Continuous Hoodie enumerator that discovers Hoodie splits from new Hoodie commits of upstream Hoodie table. + */ +public class HoodieContinuousSplitEnumerator extends AbstractHoodieSplitEnumerator { + private static final Logger LOG = LoggerFactory.getLogger(HoodieContinuousSplitEnumerator.class); + + private final SplitEnumeratorContext<HoodieSourceSplit> enumeratorContext; + private final HoodieSplitProvider splitProvider; + private final HoodieContinuousSplitDiscover splitDiscover; + private final ScanContext scanContext; + + /** + * Instant for the last enumerated commit. Next incremental enumeration should be based off + * this as the starting position. + */ + private final AtomicReference<HoodieEnumeratorPosition> position; + + public HoodieContinuousSplitEnumerator( + SplitEnumeratorContext<HoodieSourceSplit> enumeratorContext, + HoodieSplitProvider splitProvider, + HoodieContinuousSplitDiscover splitDiscover, + ScanContext scanContext, + Option<HoodieSplitEnumeratorState> enumStateOpt) { + super(enumeratorContext, splitProvider); + this.enumeratorContext = enumeratorContext; + this.splitProvider = splitProvider; + this.splitDiscover = splitDiscover; + this.scanContext = scanContext; + this.position = new AtomicReference<>(); + + if (enumStateOpt.isPresent()) { + this.position.set(HoodieEnumeratorPosition.of(enumStateOpt.get().getLastEnumeratedInstant(), enumStateOpt.get().getLastEnumeratedInstantOffset())); + } else { + // We need to set the instantOffset as null for the first read. For first read, the start instant is inclusive, + // while for continuous incremental read, the start instant is exclusive. + this.position.set(HoodieEnumeratorPosition.of(Option.empty(), Option.empty())); Review Comment: @HuangZhenQiu @danny0405 there is a semantics discrepancy and impl gap in this. For normal incremental query flow, the first read should have a starting instant pointing to the earliest available timeline instant, and the instant will be inclusive for the range. For subsequent read, the start instant will be exclusive for the range. But in HoodieEnumeratorPosition, i don't see such range info (inclusive/exclusive) is being captured. How would the SplitDiscover be able to get the correct splits? ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.enumerator; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.source.ScanContext; +import org.apache.hudi.source.split.HoodieContinuousSplitBatch; +import org.apache.hudi.source.split.HoodieContinuousSplitDiscover; +import org.apache.hudi.source.split.HoodieSourceSplit; +import org.apache.hudi.source.split.HoodieSplitProvider; + +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Continuous Hoodie enumerator that discovers Hoodie splits from new Hoodie commits of upstream Hoodie table. + */ +public class HoodieContinuousSplitEnumerator extends AbstractHoodieSplitEnumerator { + private static final Logger LOG = LoggerFactory.getLogger(HoodieContinuousSplitEnumerator.class); + + private final SplitEnumeratorContext<HoodieSourceSplit> enumeratorContext; + private final HoodieSplitProvider splitProvider; + private final HoodieContinuousSplitDiscover splitDiscover; + private final ScanContext scanContext; + + /** + * Instant for the last enumerated commit. Next incremental enumeration should be based off + * this as the starting position. + */ + private final AtomicReference<HoodieEnumeratorPosition> position; + + public HoodieContinuousSplitEnumerator( + SplitEnumeratorContext<HoodieSourceSplit> enumeratorContext, + HoodieSplitProvider splitProvider, + HoodieContinuousSplitDiscover splitDiscover, + ScanContext scanContext, + Option<HoodieSplitEnumeratorState> enumStateOpt) { + super(enumeratorContext, splitProvider); + this.enumeratorContext = enumeratorContext; + this.splitProvider = splitProvider; + this.splitDiscover = splitDiscover; + this.scanContext = scanContext; + this.position = new AtomicReference<>(); + + if (enumStateOpt.isPresent()) { + this.position.set(HoodieEnumeratorPosition.of(enumStateOpt.get().getLastEnumeratedInstant(), enumStateOpt.get().getLastEnumeratedInstantOffset())); + } else { + // We need to set the instantOffset as null for the first read. For first read, the start instant is inclusive, + // while for continuous incremental read, the start instant is exclusive. + this.position.set(HoodieEnumeratorPosition.of(Option.empty(), Option.empty())); + } + } + + @Override + public void start() { + super.start(); + enumeratorContext.callAsync( + this::discoverSplits, + this::processDiscoveredSplits, + 0L, + scanContext.getScanInterval().toMillis()); + } + + @Override + boolean shouldWaitForMoreSplits() { + return true; + } + + @Override + public HoodieSplitEnumeratorState snapshotState(long checkpointId) throws Exception { + return new HoodieSplitEnumeratorState(splitProvider.state(), position.get().issuedInstant(), position.get().issuedOffset()); + } + + private HoodieContinuousSplitBatch discoverSplits() { + int pendingSplitNumber = splitProvider.pendingSplitCount(); + if (pendingSplitNumber > scanContext.getMaxPendingSplits()) { + LOG.info( + "Pause split discovery as the assigner already has too many pending splits: {}", + pendingSplitNumber); + return HoodieContinuousSplitBatch.EMPTY; + } + return splitDiscover.discoverSplits(position.get().issuedOffset().isPresent() ? position.get().issuedOffset().get() : null); Review Comment: ```suggestion return splitDiscover.discoverSplits(position.get().issuedOffset().orElse(null)); ``` ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java: ########## Review Comment: the current toString() is a human-readable form of info to describe the object, not suitable to be used as an id. also this should be able to uniquely identify the split reading the same set of records? it should at least contain file id, which is not present currently ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.split; + +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.source.IncrementalInputSplits; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Result from continuous enumerator. It has the same semantic to the {@link org.apache.hudi.source.IncrementalInputSplits.Result}. + */ +public class HoodieContinuousSplitBatch { + public static final HoodieContinuousSplitBatch EMPTY = new HoodieContinuousSplitBatch(Collections.emptyList(), "", ""); + private final Collection<HoodieSourceSplit> splits; + private final String endInstant; // end instant to consume to + private final String offset; // monotonic increasing consumption offset + + /** + * @param splits should never be null. But it can be an empty collection + * @param endInstant should never be null, end instant to consume to + * @param offset could be null. monotonic increasing consumption offset + */ + public HoodieContinuousSplitBatch( + Collection<HoodieSourceSplit> splits, + String endInstant, + String offset) { + + ValidationUtils.checkArgument(splits != null, "Invalid to splits collection: null"); + ValidationUtils.checkArgument(endInstant != null, "Invalid end instant: null"); + this.splits = splits; + this.endInstant = endInstant; + this.offset = offset; + } + + public static HoodieContinuousSplitBatch fromResult(IncrementalInputSplits.Result result) { + List<HoodieSourceSplit> splits = result.getInputSplits().stream().map(split -> + new HoodieSourceSplit( + HoodieSourceSplit.SPLIT_COUNTER.incrementAndGet(), Review Comment: `HoodieSourceSplit` should not hold `SPLIT_COUNTER` for itself. better managed by the caller who creates the splits. it could be init'ed here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
