gemini-code-assist[bot] commented on code in PR #38988:
URL: https://github.com/apache/beam/pull/38988#discussion_r3460988199


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java:
##########
@@ -83,13 +97,19 @@ public <KeyT> void onTimer(
 
   @Override
   public <KeyT extends @Nullable Object> void finishKey(KeyT key) {
+    if (!activeKey) {
+      // This means that there were no elements for this key. Try to unblock 
any queued elements.
+      onNewKey();
+    }
+    
Preconditions.checkStateNotNull(sideInputProcessor).handleFinishKeyOrBundle();
     simpleDoFnRunner.finishKey(key);
+    this.activeKey = false;
   }
 
   @Override
   public void finishBundle() {
     simpleDoFnRunner.finishBundle();
-    sideInputProcessor.handleFinishBundle();
+    this.activeKey = false;
   }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   To prevent memory leaks, `sideInputProcessor` should be set to `null` at the 
end of `finishKey` and `finishBundle`. Otherwise, the runner will hold onto the 
`StreamingSideInputProcessor` (and its associated fetcher and state) of the 
last processed key indefinitely. Additionally, the `onTimer` method (not shown 
in the diff but present in the class) also needs to call `onNewKey()` if 
`!activeKey` to ensure the key context is initialized and any blocked elements 
are processed before the timer runs.
   
   ```suggestion
     @Override
     public <KeyT extends @Nullable Object> void finishKey(KeyT key) {
       if (!activeKey) {
         // This means that there were no elements for this key. Try to unblock 
any queued elements.
         onNewKey();
       }
       
Preconditions.checkStateNotNull(sideInputProcessor).handleFinishKeyOrBundle();
       simpleDoFnRunner.finishKey(key);
       this.activeKey = false;
       this.sideInputProcessor = null;
     }
   
     @Override
     public void finishBundle() {
       simpleDoFnRunner.finishBundle();
       this.activeKey = false;
       this.sideInputProcessor = null;
     }
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpers.java:
##########
@@ -293,23 +330,24 @@ public abstract void processTimer(
         TimerInternals.TimerData timer,
         Supplier<StreamingSideInputProcessor<?, ?>> sideInputProcessor)
         throws Exception;
-  };
+  }
 
   void processTimers(
       TimerType mode,
       DataflowExecutionContext.DataflowStepContext context,
       Coder<BoundedWindow> windowCoder,
-      Runnable startKey,
       Supplier<StreamingSideInputProcessor<?, ?>> sideInputProcessor)
       throws Exception {
     TimerInternals.TimerData timer = context.getNextFiredTimer(windowCoder);
-
     if (timer != null && fnRunner == null) {
       // If we need to run reallyStartBundle in here, we need to make sure to 
switch the state
       // sampler into the start state.
       try (Closeable start = operationContext.enterStart()) {
         reallyStartBundle();
-        startKey.run();
+      }
+
+      if (!activeKey) {
+        this.onStartKey.accept((K) context.stateInternals().getKey());
       }
     }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `if (!activeKey)` check and `onStartKey.accept(...)` call are nested 
inside the `if (timer != null && fnRunner == null)` block. However, if 
`fnRunner` is already initialized (which is the case when `reallyStartBundle` 
is called in `startBundle`), `fnRunner == null` is false, so `onStartKey` is 
never called even if `activeKey` is false. This means the side input processor 
won't be initialized for keys that only have timers, leading to potential NPEs 
or incorrect behavior. We should move the `if (!activeKey)` block outside of 
the `fnRunner == null` check, but still within the `timer != null` check.
   
   ```java
       if (timer != null) {
         if (fnRunner == null) {
           // If we need to run reallyStartBundle in here, we need to make sure 
to switch the state
           // sampler into the start state.
           try (Closeable start = operationContext.enterStart()) {
             reallyStartBundle();
           }
         }
   
         if (!activeKey) {
           this.onStartKey.accept((K) context.stateInternals().getKey());
         }
       }
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java:
##########
@@ -183,18 +156,18 @@ public void processTimers() throws Exception {
         SimpleParDoFnHelpers.TimerType.FAIL_USER,
         helpers.userStepContext,
         windowCoder,
-        this::onStartKey,
         () -> sideInputProcessor);
     helpers.processTimers(
         SimpleParDoFnHelpers.TimerType.SYSTEM,
         helpers.stepContext,
         windowCoder,
-        this::onStartKey,
         () -> sideInputProcessor);
   }
 
   @Override
-  public void finishKey(Object key) throws Exception {}
+  public void finishKey(Object key) throws Exception {
+    helpers.finishKey((K) key, sideInputProcessor);
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To prevent memory leaks, `sideInputProcessor` should be set to `null` after 
`helpers.finishKey` is called, so that the 
`StreamingKeyedWorkItemSideInputParDoFn` instance does not hold onto the stale 
processor of the last processed key.
   
   ```suggestion
     @Override
     public void finishKey(Object key) throws Exception {
       helpers.finishKey((K) key, sideInputProcessor);
       this.sideInputProcessor = null;
     }
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java:
##########
@@ -185,18 +171,18 @@ public void processTimers() throws Exception {
         SimpleParDoFnHelpers.TimerType.USER,
         helpers.userStepContext,
         windowCoder,
-        this::onStartKey,
         () -> sideInputProcessor);
     helpers.processTimers(
         SimpleParDoFnHelpers.TimerType.SYSTEM,
         helpers.stepContext,
         windowCoder,
-        this::onStartKey,
         () -> sideInputProcessor);
   }
 
   @Override
-  public void finishKey(Object key) throws Exception {}
+  public void finishKey(Object key) throws Exception {
+    helpers.finishKey(key, sideInputProcessor);
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To prevent memory leaks, `sideInputProcessor` should be set to `null` after 
`helpers.finishKey` is called, so that the `SimpleParDoFn` instance does not 
hold onto the stale processor of the last processed key.
   
   ```suggestion
     @Override
     public void finishKey(Object key) throws Exception {
       helpers.finishKey(key, sideInputProcessor);
       this.sideInputProcessor = null;
     }
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java:
##########
@@ -378,15 +395,28 @@ public void processElement(Object elem) throws Exception {
     }
 
     @Override
-    public void processTimers() {}
+    public void processTimers() throws Exception {
+      if (!activeKey) {
+        onStartKey();
+      }
+    }
 
     @Override
-    public void finishKey(Object key) throws Exception {}
+    public void finishKey(Object key) throws Exception {
+      if (!activeKey) {
+        onStartKey();
+      }
+      sideInputFetcher.persist();
+      sideInputFetcher = null;
+      this.activeKey = false;
+    }
 
     @Override
     public void finishBundle() throws Exception {
       groupingTable.flush(receiver);
-      sideInputFetcher.persist();
+      if (sideInputFetcher != null) {
+        sideInputFetcher.persist();
+      }
     }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To prevent memory leaks, `sideInputFetcher` should be set to `null` in 
`finishBundle` if it is not null.
   
   ```suggestion
       @Override
       public void finishBundle() throws Exception {
         groupingTable.flush(receiver);
         if (sideInputFetcher != null) {
           sideInputFetcher.persist();
           this.sideInputFetcher = null;
         }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to