Copilot commented on code in PR #17562:
URL: https://github.com/apache/hudi/pull/17562#discussion_r2615926890
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java:
##########
@@ -303,6 +305,26 @@ public Result inputSplits(
}
}
+ /**
+ * Returns the incremental Hoodie source split batch.
+ *
+ * @param metaClient The meta client
+ * @param startInstant The start Instant of the splits
+ * @param cdcEnabled Whether cdc is enabled
+ *
+ * @return The list of incremental input splits or empty if there are no new
instants
+ */
+ public HoodieContinuousSplitBatch inputHoodieSourceSplits(
+ HoodieTableMetaClient metaClient,
+ @Nullable String startInstant,
+ boolean cdcEnabled) {
Review Comment:
Missing input validation for the metaClient parameter. The method should
validate that metaClient is not null before using it, as calling methods on a
null metaClient would result in a NullPointerException with an unclear error
message.
```suggestion
boolean cdcEnabled) {
if (metaClient == null) {
throw new IllegalArgumentException("metaClient must not be null");
}
```
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java:
##########
@@ -303,6 +305,26 @@ public Result inputSplits(
}
}
+ /**
+ * Returns the incremental Hoodie source split batch.
+ *
+ * @param metaClient The meta client
+ * @param startInstant The start Instant of the splits
+ * @param cdcEnabled Whether cdc is enabled
+ *
+ * @return The list of incremental input splits or empty if there are no new
instants
+ */
+ public HoodieContinuousSplitBatch inputHoodieSourceSplits(
+ HoodieTableMetaClient metaClient,
+ @Nullable String startInstant,
+ boolean cdcEnabled) {
+ Result result = inputSplits(metaClient, startInstant, cdcEnabled);
+ List<HoodieSourceSplit> splits = result.inputSplits.stream().map(split ->
new HoodieSourceSplit(
+ ++HoodieSourceSplit.SPLIT_COUNTER, split.getBasePath().orElse(null),
split.getLogPaths(), split.getTablePath(), split.getMergeType(),
split.getFileId())).collect(Collectors.toList());
Review Comment:
The prefix increment operator (++HoodieSourceSplit.SPLIT_COUNTER) in a
stream operation is problematic because: 1) it's not thread-safe as discussed
in the SPLIT_COUNTER declaration, and 2) the order of execution in parallel
streams could lead to non-deterministic split ID assignments. Consider using a
proper atomic counter or a different approach to generate unique split IDs.
```suggestion
AtomicInteger splitIdCounter = new
AtomicInteger(HoodieSourceSplit.SPLIT_COUNTER);
List<HoodieSourceSplit> splits = result.inputSplits.stream().map(split
-> new HoodieSourceSplit(
splitIdCounter.incrementAndGet(), split.getBasePath().orElse(null),
split.getLogPaths(), split.getTablePath(), split.getMergeType(),
split.getFileId())).collect(Collectors.toList());
```
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieEnumeratorPosition.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.enumerator;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/***
+ * The position of {@link HoodieContinuousSplitEnumerator}.
+ */
+public class HoodieEnumeratorPosition implements Serializable {
+ private final String lastInstant;
+ private final String lastInstantCompletionTime;
+
+ static HoodieEnumeratorPosition empty() {
+ return new HoodieEnumeratorPosition(null, null);
+ }
+
+ static HoodieEnumeratorPosition of(String lastInstant, String
lastInstantCompletionTime) {
+ return new HoodieEnumeratorPosition(lastInstant,
lastInstantCompletionTime);
+ }
+
+ private HoodieEnumeratorPosition(String lastInstant, String
lastInstantCompletionTime) {
+ this.lastInstant = lastInstant;
+ this.lastInstantCompletionTime = lastInstantCompletionTime;
+ }
+
+ String lastInstant() {
+ return lastInstant;
+ }
+
+ String lastInstantCompletionTime() {
Review Comment:
Package-private visibility for the lastInstant() and
lastInstantCompletionTime() accessor methods is inconsistent with the public
visibility of the empty() and of() factory methods. Since this class is used
across packages (as evidenced by HoodieContinuousSplitEnumerator accessing
these methods), these accessors should be public to maintain a consistent API
design.
```suggestion
public String lastInstant() {
return lastInstant;
}
public String lastInstantCompletionTime() {
```
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Result from continuous enumerator.
+ */
+public class HoodieContinuousSplitBatch {
+ public static HoodieContinuousSplitBatch EMPTY = new
HoodieContinuousSplitBatch(Collections.emptyList(), "", "");
Review Comment:
The empty string literals passed to EMPTY should use null instead for
semantic correctness. According to the javadoc comment on line 38, the
toInstant "can have null instant timestamp in String format". Using empty
strings instead of null values is inconsistent with the documented behavior and
could lead to confusion. The fromInstant comment also indicates it "can be
null".
```suggestion
public static HoodieContinuousSplitBatch EMPTY = new
HoodieContinuousSplitBatch(Collections.emptyList(), null, "");
```
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitDiscover.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for continuously discovery new splits from active timeline.
Review Comment:
The comment mentions "continuously discovery" which should be "continuously
discover" (verb form instead of noun form).
```suggestion
* Interface for continuously discover new splits from active timeline.
```
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java:
##########
@@ -31,8 +31,8 @@
* Hoodie SourceSplit implementation for source V2.
*/
public class HoodieSourceSplit implements SourceSplit, Serializable {
+ public static int SPLIT_COUNTER = 0;
Review Comment:
The static mutable field SPLIT_COUNTER is not thread-safe. In a concurrent
environment like Flink, multiple threads could increment this counter
simultaneously, leading to race conditions and potentially duplicate split IDs.
This field should either be changed to an AtomicInteger or the counter
management should be handled differently.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java:
##########
@@ -303,6 +305,26 @@ public Result inputSplits(
}
}
+ /**
+ * Returns the incremental Hoodie source split batch.
+ *
+ * @param metaClient The meta client
+ * @param startInstant The start Instant of the splits
+ * @param cdcEnabled Whether cdc is enabled
+ *
+ * @return The list of incremental input splits or empty if there are no new
instants
+ */
+ public HoodieContinuousSplitBatch inputHoodieSourceSplits(
+ HoodieTableMetaClient metaClient,
+ @Nullable String startInstant,
+ boolean cdcEnabled) {
+ Result result = inputSplits(metaClient, startInstant, cdcEnabled);
+ List<HoodieSourceSplit> splits = result.inputSplits.stream().map(split ->
new HoodieSourceSplit(
+ ++HoodieSourceSplit.SPLIT_COUNTER, split.getBasePath().orElse(null),
split.getLogPaths(), split.getTablePath(), split.getMergeType(),
split.getFileId())).collect(Collectors.toList());
Review Comment:
The line formatting violates code style conventions by placing multiple
operations on a single long line (map operation with constructor call). This
reduces readability. Consider breaking this into multiple lines with proper
indentation, placing the lambda expression body on separate lines.
```suggestion
List<HoodieSourceSplit> splits = result.inputSplits.stream().map(split
-> {
return new HoodieSourceSplit(
++HoodieSourceSplit.SPLIT_COUNTER,
split.getBasePath().orElse(null),
split.getLogPaths(),
split.getTablePath(),
split.getMergeType(),
split.getFileId()
);
}).collect(Collectors.toList());
```
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java:
##########
@@ -31,8 +31,8 @@
* Hoodie SourceSplit implementation for source V2.
*/
public class HoodieSourceSplit implements SourceSplit, Serializable {
+ public static int SPLIT_COUNTER = 0;
Review Comment:
The public static field SPLIT_COUNTER violates encapsulation and is
problematic for several reasons: 1) It's directly accessible and modifiable
from anywhere, 2) It doesn't reset between test runs which could cause test
interference, 3) It makes the class non-thread-safe. Consider making this field
private and providing controlled access through methods, or better yet, use a
different approach for generating split IDs that doesn't rely on mutable static
state.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitDiscover.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for continuously discovery new splits from active timeline.
+ */
+@Internal
+public interface HoodieContinuousSplitDiscover {
+
+ /** Discover the files appended between {@code lostInstant} and the last
commit of current table */
Review Comment:
The typo "lostInstant" should be "lastInstant" in the javadoc comment.
```suggestion
/** Discover the files appended between {@code lastInstant} and the last
commit of current table */
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]