xushiyan commented on code in PR #17562:
URL: https://github.com/apache/hudi/pull/17562#discussion_r2629258823


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.enumerator;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.source.ScanContext;
+import org.apache.hudi.source.split.HoodieContinuousSplitBatch;
+import org.apache.hudi.source.split.HoodieContinuousSplitDiscover;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.source.split.HoodieSplitProvider;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Continuous Hoodie enumerator that discovers Hoodie splits from new Hoodie 
commits of upstream Hoodie table.
+ */
+public class HoodieContinuousSplitEnumerator extends 
AbstractHoodieSplitEnumerator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieContinuousSplitEnumerator.class);
+
+  private final SplitEnumeratorContext<HoodieSourceSplit> enumeratorContext;
+  private final HoodieSplitProvider splitProvider;
+  private final HoodieContinuousSplitDiscover splitDiscover;
+  private final ScanContext scanContext;
+
+  /**
+   * Instant for the last enumerated commit. Next incremental enumeration 
should be based off
+   * this as the starting position.
+   */
+  private final AtomicReference<HoodieEnumeratorPosition> position;
+
+  public HoodieContinuousSplitEnumerator(
+      SplitEnumeratorContext<HoodieSourceSplit> enumeratorContext,
+      HoodieSplitProvider splitProvider,
+      HoodieContinuousSplitDiscover splitDiscover,
+      ScanContext scanContext,
+      Option<HoodieSplitEnumeratorState> enumStateOpt) {
+    super(enumeratorContext, splitProvider);
+    this.enumeratorContext = enumeratorContext;
+    this.splitProvider = splitProvider;
+    this.splitDiscover = splitDiscover;
+    this.scanContext = scanContext;
+    this.position = new AtomicReference<>();
+
+    if (enumStateOpt.isPresent()) {
+      
this.position.set(HoodieEnumeratorPosition.of(enumStateOpt.get().getLastEnumeratedInstant(),
 enumStateOpt.get().getLastEnumeratedInstantOffset()));
+    } else {
+      // We need to set the instantOffset as null for the first read. For 
first read, the start instant is inclusive,
+      // while for continuous incremental read, the start instant is exclusive.
+      this.position.set(HoodieEnumeratorPosition.of(Option.empty(), 
Option.empty()));

Review Comment:
   @HuangZhenQiu @danny0405 there is a semantics discrepancy and impl gap in 
this. 
   
   For normal incremental query flow, the first read should have a starting 
instant pointing to the earliest available timeline instant, and the instant 
will be inclusive for the range. For subsequent read, the start instant will be 
exclusive for the range.
   
   But in HoodieEnumeratorPosition, i don't see such range info 
(inclusive/exclusive) is being captured. How would the SplitDiscover be able to 
get the correct splits?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.enumerator;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.source.ScanContext;
+import org.apache.hudi.source.split.HoodieContinuousSplitBatch;
+import org.apache.hudi.source.split.HoodieContinuousSplitDiscover;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.source.split.HoodieSplitProvider;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Continuous Hoodie enumerator that discovers Hoodie splits from new Hoodie 
commits of upstream Hoodie table.
+ */
+public class HoodieContinuousSplitEnumerator extends 
AbstractHoodieSplitEnumerator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieContinuousSplitEnumerator.class);
+
+  private final SplitEnumeratorContext<HoodieSourceSplit> enumeratorContext;
+  private final HoodieSplitProvider splitProvider;
+  private final HoodieContinuousSplitDiscover splitDiscover;
+  private final ScanContext scanContext;
+
+  /**
+   * Instant for the last enumerated commit. Next incremental enumeration 
should be based off
+   * this as the starting position.
+   */
+  private final AtomicReference<HoodieEnumeratorPosition> position;
+
+  public HoodieContinuousSplitEnumerator(
+      SplitEnumeratorContext<HoodieSourceSplit> enumeratorContext,
+      HoodieSplitProvider splitProvider,
+      HoodieContinuousSplitDiscover splitDiscover,
+      ScanContext scanContext,
+      Option<HoodieSplitEnumeratorState> enumStateOpt) {
+    super(enumeratorContext, splitProvider);
+    this.enumeratorContext = enumeratorContext;
+    this.splitProvider = splitProvider;
+    this.splitDiscover = splitDiscover;
+    this.scanContext = scanContext;
+    this.position = new AtomicReference<>();
+
+    if (enumStateOpt.isPresent()) {
+      
this.position.set(HoodieEnumeratorPosition.of(enumStateOpt.get().getLastEnumeratedInstant(),
 enumStateOpt.get().getLastEnumeratedInstantOffset()));
+    } else {
+      // We need to set the instantOffset as null for the first read. For 
first read, the start instant is inclusive,
+      // while for continuous incremental read, the start instant is exclusive.
+      this.position.set(HoodieEnumeratorPosition.of(Option.empty(), 
Option.empty()));
+    }
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    enumeratorContext.callAsync(
+        this::discoverSplits,
+        this::processDiscoveredSplits,
+        0L,
+        scanContext.getScanInterval().toMillis());
+  }
+
+  @Override
+  boolean shouldWaitForMoreSplits() {
+    return true;
+  }
+
+  @Override
+  public HoodieSplitEnumeratorState snapshotState(long checkpointId) throws 
Exception {
+    return new HoodieSplitEnumeratorState(splitProvider.state(), 
position.get().issuedInstant(), position.get().issuedOffset());
+  }
+
+  private HoodieContinuousSplitBatch discoverSplits() {
+    int pendingSplitNumber = splitProvider.pendingSplitCount();
+    if (pendingSplitNumber > scanContext.getMaxPendingSplits()) {
+      LOG.info(
+          "Pause split discovery as the assigner already has too many pending 
splits: {}",
+          pendingSplitNumber);
+      return HoodieContinuousSplitBatch.EMPTY;
+    }
+    return 
splitDiscover.discoverSplits(position.get().issuedOffset().isPresent() ? 
position.get().issuedOffset().get() : null);

Review Comment:
   ```suggestion
       return 
splitDiscover.discoverSplits(position.get().issuedOffset().orElse(null));
   ```



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java:
##########


Review Comment:
   the current toString() is a human-readable form of info to describe the 
object, not suitable to be used as an id. also this should be able to uniquely 
identify the split reading the same set of records? it should at least contain 
file id, which is not present currently



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.source.IncrementalInputSplits;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Result from continuous enumerator. It has the same semantic to the {@link 
org.apache.hudi.source.IncrementalInputSplits.Result}.
+ */
+public class HoodieContinuousSplitBatch {
+  public static final HoodieContinuousSplitBatch EMPTY = new 
HoodieContinuousSplitBatch(Collections.emptyList(), "", "");
+  private final Collection<HoodieSourceSplit> splits;
+  private final String endInstant; // end instant to consume to
+  private final String offset;     // monotonic increasing consumption offset
+
+  /**
+   * @param splits should never be null. But it can be an empty collection
+   * @param endInstant should never be null, end instant to consume to
+   * @param offset could be null. monotonic increasing consumption offset
+   */
+  public HoodieContinuousSplitBatch(
+      Collection<HoodieSourceSplit> splits,
+      String endInstant,
+      String offset) {
+
+    ValidationUtils.checkArgument(splits != null, "Invalid to splits 
collection: null");
+    ValidationUtils.checkArgument(endInstant != null, "Invalid end instant: 
null");
+    this.splits = splits;
+    this.endInstant = endInstant;
+    this.offset = offset;
+  }
+
+  public static HoodieContinuousSplitBatch 
fromResult(IncrementalInputSplits.Result result) {
+    List<HoodieSourceSplit> splits = 
result.getInputSplits().stream().map(split ->
+        new HoodieSourceSplit(
+            HoodieSourceSplit.SPLIT_COUNTER.incrementAndGet(),

Review Comment:
   `HoodieSourceSplit` should not hold `SPLIT_COUNTER` for itself. better 
managed by the caller who creates the splits. it could be init'ed here 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to