Copilot commented on code in PR #17562:
URL: https://github.com/apache/hudi/pull/17562#discussion_r2615926890


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java:
##########
@@ -303,6 +305,26 @@ public Result inputSplits(
     }
   }
 
+  /**
+   * Returns the incremental Hoodie source split batch.
+   *
+   * @param metaClient    The meta client
+   * @param startInstant  The start Instant of the splits
+   * @param cdcEnabled    Whether cdc is enabled
+   *
+   * @return The list of incremental input splits or empty if there are no new 
instants
+   */
+  public HoodieContinuousSplitBatch inputHoodieSourceSplits(
+      HoodieTableMetaClient metaClient,
+      @Nullable String startInstant,
+      boolean cdcEnabled) {

Review Comment:
   Missing input validation for the metaClient parameter. The method should 
validate that metaClient is not null before using it, as calling methods on a 
null metaClient would result in a NullPointerException with an unclear error 
message.
   ```suggestion
         boolean cdcEnabled) {
       if (metaClient == null) {
         throw new IllegalArgumentException("metaClient must not be null");
       }
   ```



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java:
##########
@@ -303,6 +305,26 @@ public Result inputSplits(
     }
   }
 
+  /**
+   * Returns the incremental Hoodie source split batch.
+   *
+   * @param metaClient    The meta client
+   * @param startInstant  The start Instant of the splits
+   * @param cdcEnabled    Whether cdc is enabled
+   *
+   * @return The list of incremental input splits or empty if there are no new 
instants
+   */
+  public HoodieContinuousSplitBatch inputHoodieSourceSplits(
+      HoodieTableMetaClient metaClient,
+      @Nullable String startInstant,
+      boolean cdcEnabled) {
+    Result result = inputSplits(metaClient, startInstant, cdcEnabled);
+    List<HoodieSourceSplit> splits = result.inputSplits.stream().map(split -> 
new HoodieSourceSplit(
+        ++HoodieSourceSplit.SPLIT_COUNTER, split.getBasePath().orElse(null), 
split.getLogPaths(), split.getTablePath(), split.getMergeType(), 
split.getFileId())).collect(Collectors.toList());

Review Comment:
   The prefix increment operator (++HoodieSourceSplit.SPLIT_COUNTER) in a 
stream operation is problematic because: 1) it's not thread-safe as discussed 
in the SPLIT_COUNTER declaration, and 2) the order of execution in parallel 
streams could lead to non-deterministic split ID assignments. Consider using a 
proper atomic counter or a different approach to generate unique split IDs.
   ```suggestion
       AtomicInteger splitIdCounter = new 
AtomicInteger(HoodieSourceSplit.SPLIT_COUNTER);
       List<HoodieSourceSplit> splits = result.inputSplits.stream().map(split 
-> new HoodieSourceSplit(
           splitIdCounter.incrementAndGet(), split.getBasePath().orElse(null), 
split.getLogPaths(), split.getTablePath(), split.getMergeType(), 
split.getFileId())).collect(Collectors.toList());
   ```



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieEnumeratorPosition.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.enumerator;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/***
+ * The position of {@link HoodieContinuousSplitEnumerator}.
+ */
+public class HoodieEnumeratorPosition implements Serializable {
+  private final String lastInstant;
+  private final String lastInstantCompletionTime;
+
+  static HoodieEnumeratorPosition empty() {
+    return new HoodieEnumeratorPosition(null, null);
+  }
+
+  static HoodieEnumeratorPosition of(String lastInstant, String 
lastInstantCompletionTime) {
+    return new HoodieEnumeratorPosition(lastInstant, 
lastInstantCompletionTime);
+  }
+
+  private HoodieEnumeratorPosition(String lastInstant, String 
lastInstantCompletionTime) {
+    this.lastInstant = lastInstant;
+    this.lastInstantCompletionTime = lastInstantCompletionTime;
+  }
+
+  String lastInstant() {
+    return lastInstant;
+  }
+
+  String lastInstantCompletionTime() {

Review Comment:
   Package-private visibility for the lastInstant() and 
lastInstantCompletionTime() accessor methods is inconsistent with the public 
visibility of the empty() and of() factory methods. Since this class is used 
across packages (as evidenced by HoodieContinuousSplitEnumerator accessing 
these methods), these accessors should be public to maintain a consistent API 
design.
   ```suggestion
     public String lastInstant() {
       return lastInstant;
     }
   
     public String lastInstantCompletionTime() {
   ```



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Result from continuous enumerator.
+ */
+public class HoodieContinuousSplitBatch {
+  public static HoodieContinuousSplitBatch EMPTY = new 
HoodieContinuousSplitBatch(Collections.emptyList(), "", "");

Review Comment:
   The empty string literals passed to EMPTY should use null instead for 
semantic correctness. According to the javadoc comment on line 38, the 
toInstant "can have null instant timestamp in String format". Using empty 
strings instead of null values is inconsistent with the documented behavior and 
could lead to confusion. The fromInstant comment also indicates it "can be 
null".
   ```suggestion
     public static HoodieContinuousSplitBatch EMPTY = new 
HoodieContinuousSplitBatch(Collections.emptyList(), null, "");
   ```



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitDiscover.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for continuously discovery new splits from active timeline.

Review Comment:
   The comment mentions "continuously discovery" which should be "continuously 
discover" (verb form instead of noun form).
   ```suggestion
    * Interface for continuously discover new splits from active timeline.
   ```



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java:
##########
@@ -31,8 +31,8 @@
  * Hoodie SourceSplit implementation for source V2.
  */
 public class HoodieSourceSplit implements SourceSplit, Serializable {
+  public static int SPLIT_COUNTER = 0;

Review Comment:
   The static mutable field SPLIT_COUNTER is not thread-safe. In a concurrent 
environment like Flink, multiple threads could increment this counter 
simultaneously, leading to race conditions and potentially duplicate split IDs. 
This field should either be changed to an AtomicInteger or the counter 
management should be handled differently.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java:
##########
@@ -303,6 +305,26 @@ public Result inputSplits(
     }
   }
 
+  /**
+   * Returns the incremental Hoodie source split batch.
+   *
+   * @param metaClient    The meta client
+   * @param startInstant  The start Instant of the splits
+   * @param cdcEnabled    Whether cdc is enabled
+   *
+   * @return The list of incremental input splits or empty if there are no new 
instants
+   */
+  public HoodieContinuousSplitBatch inputHoodieSourceSplits(
+      HoodieTableMetaClient metaClient,
+      @Nullable String startInstant,
+      boolean cdcEnabled) {
+    Result result = inputSplits(metaClient, startInstant, cdcEnabled);
+    List<HoodieSourceSplit> splits = result.inputSplits.stream().map(split -> 
new HoodieSourceSplit(
+        ++HoodieSourceSplit.SPLIT_COUNTER, split.getBasePath().orElse(null), 
split.getLogPaths(), split.getTablePath(), split.getMergeType(), 
split.getFileId())).collect(Collectors.toList());

Review Comment:
   The line formatting violates code style conventions by placing multiple 
operations on a single long line (map operation with constructor call). This 
reduces readability. Consider breaking this into multiple lines with proper 
indentation, placing the lambda expression body on separate lines.
   ```suggestion
       List<HoodieSourceSplit> splits = result.inputSplits.stream().map(split 
-> {
         return new HoodieSourceSplit(
             ++HoodieSourceSplit.SPLIT_COUNTER,
             split.getBasePath().orElse(null),
             split.getLogPaths(),
             split.getTablePath(),
             split.getMergeType(),
             split.getFileId()
         );
       }).collect(Collectors.toList());
   ```



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java:
##########
@@ -31,8 +31,8 @@
  * Hoodie SourceSplit implementation for source V2.
  */
 public class HoodieSourceSplit implements SourceSplit, Serializable {
+  public static int SPLIT_COUNTER = 0;

Review Comment:
   The public static field SPLIT_COUNTER violates encapsulation and is 
problematic for several reasons: 1) It's directly accessible and modifiable 
from anywhere, 2) It doesn't reset between test runs which could cause test 
interference, 3) It makes the class non-thread-safe. Consider making this field 
private and providing controlled access through methods, or better yet, use a 
different approach for generating split IDs that doesn't rely on mutable static 
state.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitDiscover.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for continuously discovery new splits from active timeline.
+ */
+@Internal
+public interface HoodieContinuousSplitDiscover {
+
+  /** Discover the files appended between {@code lostInstant} and the last 
commit of current table */

Review Comment:
   The typo "lostInstant" should be "lastInstant" in the javadoc comment.
   ```suggestion
     /** Discover the files appended between {@code lastInstant} and the last 
commit of current table */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to