gemini-code-assist[bot] commented on code in PR #39091:
URL: https://github.com/apache/beam/pull/39091#discussion_r3468469999


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java:
##########
@@ -71,54 +72,31 @@ public class BeamFnDataOutboundAggregator {
   private static final Logger LOG = 
LoggerFactory.getLogger(BeamFnDataOutboundAggregator.class);
   private final int sizeLimit;
   private final long timeLimit;
-  // The instructionId is set between prepareForInstruction and 
finishInstruction/discard.
-  private @Nullable String instructionId = null;
-  @VisibleForTesting final Map<String, Receiver<?>> outputDataReceivers = new 
HashMap<>();
-  @VisibleForTesting final Map<TimerEndpoint, Receiver<?>> 
outputTimersReceivers = new HashMap<>();
-  @Nullable private StreamObserver<Elements> outboundObserver;
+  private final Supplier<String> processBundleRequestIdSupplier;
+  @VisibleForTesting final Map<String, Receiver<?>> outputDataReceivers;
+  @VisibleForTesting final Map<TimerEndpoint, Receiver<?>> 
outputTimersReceivers;
+  private final StreamObserver<Elements> outboundObserver;
   @Nullable @VisibleForTesting ScheduledFuture<?> flushFuture;
-  private long bytesWrittenSinceFlush = 0;
-  private final Object flushLock = new Object();
+  private long bytesWrittenSinceFlush;
+  private final Object flushLock;
   private final boolean collectElementsIfNoFlushes;
-  private boolean hasFlushedForBundle = false;
+  private boolean hasFlushedForBundle;
 
-  public BeamFnDataOutboundAggregator(PipelineOptions options, boolean 
collectElementsIfNoFlushes) {
+  public BeamFnDataOutboundAggregator(
+      PipelineOptions options,
+      Supplier<String> processBundleRequestIdSupplier,
+      StreamObserver<Elements> outboundObserver,
+      boolean collectElementsIfNoFlushes) {
     this.sizeLimit = getSizeLimit(options);
     this.timeLimit = getTimeLimit(options);
     this.collectElementsIfNoFlushes = collectElementsIfNoFlushes;
-  }
-
-  public void prepareForInstruction(
-      String instructionId, StreamObserver<Elements> outboundObserver) {
-    if (timeLimit > 0) {
-      synchronized (flushLock) {
-        checkState(this.instructionId == null && this.outboundObserver == 
null);
-        this.instructionId = instructionId;
-        this.outboundObserver = outboundObserver;
-      }
-    } else {
-      checkState(this.instructionId == null && this.outboundObserver == null);
-      this.instructionId = instructionId;
-      this.outboundObserver = outboundObserver;
-    }
-  }
-
-  public void finishInstruction() {
-    if (flushFuture != null) {
-      synchronized (flushLock) {
-        checkState(
-            this.instructionId != null && this.outboundObserver != null,
-            "instruction was not started or previously completed");
-        checkState(bytesWrittenSinceFlush == 0, "bytes were not flushed for 
instruction");
-        this.instructionId = null;
-        this.outboundObserver = null;
-      }
-    } else {
-      checkState(this.instructionId != null && this.outboundObserver != null);
-      checkState(bytesWrittenSinceFlush == 0, "bytes were not flushed for 
instruction");
-      this.instructionId = null;
-      this.outboundObserver = null;
-    }
+    this.outputDataReceivers = new HashMap<>();
+    this.outputTimersReceivers = new HashMap<>();
+    this.outboundObserver = outboundObserver;
+    this.processBundleRequestIdSupplier = processBundleRequestIdSupplier;

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `outboundObserver` and `processBundleRequestIdSupplier` parameters are 
stored in non-nullable fields and dereferenced directly in methods like 
`flushInternal`, `sendElements`, and 
`sendOrCollectBufferedDataAndFinishOutboundStreams` without any null checks. To 
prevent potential `NullPointerException`s, we should explicitly validate that 
they are not null using `Objects.requireNonNull`.
   
   ```suggestion
       this.outboundObserver = 
java.util.Objects.requireNonNull(outboundObserver, "outboundObserver");
       this.processBundleRequestIdSupplier = 
java.util.Objects.requireNonNull(processBundleRequestIdSupplier, 
"processBundleRequestIdSupplier");
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java:
##########
@@ -137,43 +138,34 @@ private static void removeKeyRecursively(JsonNode node, 
String keyToRemove) {
   }
 
   public static void main(String[] args) throws Exception {
-    Function<String, @Nullable String> environmentVarGetter = System::getenv;
-    main(environmentVarGetter);
+    main(System::getenv);
   }
 
   @VisibleForTesting
-  public static void main(Function<String, @Nullable String> 
environmentVarGetter)
-      throws Exception {
+  public static void main(Function<String, String> environmentVarGetter) 
throws Exception {
     JvmInitializers.runOnStartup();
 
     Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
-        getApiServiceDescriptor(
-            checkNotNull(
-                environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR),
-                "LOGGING_API_SERVICE_DESCRIPTOR env var must be set."));
+        
getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
     Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
-        getApiServiceDescriptor(
-            checkNotNull(
-                environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR),
-                "CONTROL_API_SERVICE_DESCRIPTOR env var must be set."));
-
-    @Nullable String envVar = 
environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR);
-    Endpoints.@Nullable ApiServiceDescriptor statusApiServiceDescriptor =
-        (envVar == null) ? null : getApiServiceDescriptor(envVar);
-    String id =
-        checkNotNull(environmentVarGetter.apply(HARNESS_ID), "HARNESS_ID env 
var must be set.");
+        
getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
+    Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
+        environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null
+            ? null
+            : 
getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
+    String id = environmentVarGetter.apply(HARNESS_ID);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Removing the `checkNotNull` guards for required environment variables 
(`LOGGING_API_SERVICE_DESCRIPTOR`, `CONTROL_API_SERVICE_DESCRIPTOR`, and 
`HARNESS_ID`) makes the application less robust. If any of these are missing, 
the application will fail later with a generic `NullPointerException` instead 
of a clear, actionable error message. We should restore these safety checks 
using standard `java.util.Objects.requireNonNull`.
   
   ```suggestion
       Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
           getApiServiceDescriptor(
               java.util.Objects.requireNonNull(
                   environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR),
                   "LOGGING_API_SERVICE_DESCRIPTOR env var must be set."));
       Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
           getApiServiceDescriptor(
               java.util.Objects.requireNonNull(
                   environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR),
                   "CONTROL_API_SERVICE_DESCRIPTOR env var must be set."));
       Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
           environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null
               ? null
               : 
getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
       String id =
           java.util.Objects.requireNonNull(
               environmentVarGetter.apply(HARNESS_ID), "HARNESS_ID env var must 
be set.");
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java:
##########
@@ -272,7 +263,7 @@ public static void main(
       Set<String> runnerCapabilites,
       Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor,
       Endpoints.ApiServiceDescriptor controlApiServiceDescriptor,
-      Endpoints.@Nullable ApiServiceDescriptor statusApiServiceDescriptor,
+      Endpoints.ApiServiceDescriptor statusApiServiceDescriptor,

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `statusApiServiceDescriptor` parameter can be `null` (as it is 
initialized to `null` if `STATUS_API_SERVICE_DESCRIPTOR` is not set in the 
environment). While the overload of `main` at line 222 correctly annotates it 
with `@Nullable`, this overload at line 266 is missing the annotation. We 
should add `@Nullable` to keep the annotations consistent and prevent static 
analysis warnings.
   
   ```suggestion
         @Nullable Endpoints.ApiServiceDescriptor statusApiServiceDescriptor,
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java:
##########
@@ -187,12 +179,11 @@ public static void main(Function<String, @Nullable 
String> environmentVarGetter)
     } catch (Exception e) {
       System.out.format("Problem loading pipeline options from file: %s%n", 
e.getMessage());
     }
-    if (pipelineOptionsJson == null) {
-      pipelineOptionsJson = 
checkNotNull(environmentVarGetter.apply(PIPELINE_OPTIONS));
-    }
+
     System.out.format("Pipeline options %s%n", pipelineOptionsJson);
     // TODO: https://github.com/apache/beam/issues/30301
     pipelineOptionsJson = removeNestedKey(pipelineOptionsJson, 
"impersonateServiceAccount");
+
     PipelineOptions options = 
PipelineOptionsTranslation.fromJson(pipelineOptionsJson);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   If both `PIPELINE_OPTIONS` and `PIPELINE_OPTIONS_FILE` are missing or fail 
to load, `pipelineOptionsJson` will be `null`. Passing `null` to 
`removeNestedKey` or `PipelineOptionsTranslation.fromJson` will result in a 
`NullPointerException`. We should explicitly validate that 
`pipelineOptionsJson` is not null before proceeding.
   
   ```java
       java.util.Objects.requireNonNull(
           pipelineOptionsJson,
           "Either PIPELINE_OPTIONS env var must be set or 
PIPELINE_OPTIONS_FILE must be set and valid.");
       System.out.format("Pipeline options %s%n", pipelineOptionsJson);
       // TODO: https://github.com/apache/beam/issues/30301
       pipelineOptionsJson = removeNestedKey(pipelineOptionsJson, 
"impersonateServiceAccount");
   
       PipelineOptions options = 
PipelineOptionsTranslation.fromJson(pipelineOptionsJson);
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java:
##########
@@ -328,7 +277,7 @@ void flush() {
 
   /** Check if the flush thread failed with an exception. */
   private void checkFlushThreadException() throws IOException {
-    if (flushFuture != null && flushFuture.isDone()) {
+    if (timeLimit > 0 && flushFuture.isDone()) {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Since `flushFuture` is annotated with `@Nullable` and is only initialized 
when `start()` is called (if `timeLimit > 0`), dereferencing it directly as 
`flushFuture.isDone()` without a null check can lead to a 
`NullPointerException` (for example, if `checkFlushThreadException()` is called 
before `start()` or if `flushFuture` was not successfully initialized). We 
should add a null check for `flushFuture`.
   
   ```suggestion
       if (timeLimit > 0 && flushFuture != null && flushFuture.isDone()) {
   ```



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java:
##########
@@ -839,7 +839,7 @@ private class TestBeamFnDataOutboundAggregator extends 
BeamFnDataOutboundAggrega
       private Supplier<String> processBundleRequestIdSupplier;
 
       public TestBeamFnDataOutboundAggregator(Supplier<String> 
bundleIdSupplier) {
-        super(PipelineOptionsFactory.create(), false);
+        super(PipelineOptionsFactory.create(), bundleIdSupplier, null, false);

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Passing `null` as the `outboundObserver` to the super constructor will cause 
`NullPointerException`s when any outbound data or timers are flushed, as the 
base class now dereferences it directly. Since Mockito is available in this 
test file, we should pass a mocked `StreamObserver` instead of `null`.
   
   ```java
         public TestBeamFnDataOutboundAggregator(Supplier<String> 
bundleIdSupplier) {
           super(
               PipelineOptionsFactory.create(),
               bundleIdSupplier,
               
org.mockito.Mockito.mock(org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver.class),
               false);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to