gemini-code-assist[bot] commented on code in PR #39174:
URL: https://github.com/apache/beam/pull/39174#discussion_r3500363184
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java:
##########
@@ -271,9 +271,47 @@ public void processElement(ProcessContext context,
@StateId("count") ValueState<
private final String watchPathStr;
}
+ private static class AfterNumberOfNewOutputs
+ implements Watch.Growth.TerminationCondition<String, Integer> {
+ private final int numOutputs;
+
+ public AfterNumberOfNewOutputs(int numOutputs) {
+ this.numOutputs = numOutputs;
+ }
+
+ @Override
+ public org.apache.beam.sdk.coders.Coder<Integer> getStateCoder() {
+ return VarIntCoder.of();
+ }
+
+ @Override
+ public Integer forNewInput(org.joda.time.Instant now, String input) {
+ return 0;
+ }
+
+ @Override
+ public Integer onSeenNewOutput(org.joda.time.Instant now, Integer state) {
+ return state + 1;
+ }
Review Comment:

To prevent potential `NullPointerException` due to auto-unboxing, we should
defensively handle the case where `state` is null. Although `forNewInput`
initializes the state to `0`, in distributed execution or state restoration
scenarios, the state could potentially be null.
```suggestion
public Integer onSeenNewOutput(org.joda.time.Instant now, Integer state)
{
return (state == null ? 0 : state) + 1;
}
```
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java:
##########
@@ -271,9 +271,47 @@ public void processElement(ProcessContext context,
@StateId("count") ValueState<
private final String watchPathStr;
}
+ private static class AfterNumberOfNewOutputs
+ implements Watch.Growth.TerminationCondition<String, Integer> {
+ private final int numOutputs;
+
+ public AfterNumberOfNewOutputs(int numOutputs) {
+ this.numOutputs = numOutputs;
+ }
+
+ @Override
+ public org.apache.beam.sdk.coders.Coder<Integer> getStateCoder() {
+ return VarIntCoder.of();
+ }
+
+ @Override
+ public Integer forNewInput(org.joda.time.Instant now, String input) {
+ return 0;
+ }
+
+ @Override
+ public Integer onSeenNewOutput(org.joda.time.Instant now, Integer state) {
+ return state + 1;
+ }
+
+ @Override
+ public boolean canStopPolling(org.joda.time.Instant now, Integer state) {
+ return state >= numOutputs;
+ }
Review Comment:

To prevent potential `NullPointerException` due to auto-unboxing when
comparing `state` with `numOutputs`, we should defensively handle the case
where `state` is null.
```suggestion
public boolean canStopPolling(org.joda.time.Instant now, Integer state) {
return (state == null ? 0 : state) >= numOutputs;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]