gemini-code-assist[bot] commented on code in PR #37831:
URL: https://github.com/apache/beam/pull/37831#discussion_r2921086433


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java:
##########
@@ -1804,6 +1804,11 @@ public <T> void outputWindowedValue(
       outputTo(consumer, WindowedValues.of(output, timestamp, windows, 
paneInfo));
     }
 
+    @Override
+    public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
+      return currentElement.causedByDrain();
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This `causedByDrain(DoFn<InputT, OutputT> doFn)` implementation is also 
added to `NonWindowObservingProcessBundleContextBase`. To avoid code 
duplication, consider moving this method to the common base class 
`ProcessBundleContextBase`.



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java:
##########
@@ -462,10 +462,24 @@ public String getErrorContext() {
         elementState.readLater();
         restrictionState.readLater();
         watermarkEstimatorState.readLater();
-        elementAndRestriction = KV.of(elementState.read(), 
restrictionState.read());
+        WindowedValue<InputT> read = elementState.read();
+        if (timer.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN) {
+          read =
+              WindowedValues.of(
+                  read.getValue(),
+                  read.getTimestamp(),
+                  read.getWindows(),
+                  read.getPaneInfo(),
+                  read.getRecordId(),
+                  read.getRecordOffset(),
+                  CausedByDrain.CAUSED_BY_DRAIN);
+        }
+        elementAndRestriction = KV.of(read, restrictionState.read());
         watermarkEstimatorStateT = watermarkEstimatorState.read();
       }
 
+      // elementAndRestriction should be patched with drain info

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This comment appears to be a leftover development note and can be removed to 
keep the code clean.



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java:
##########
@@ -598,15 +612,16 @@ public String getErrorContext() {
       Instant wakeupTime =
           
timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
       holdState.add(futureOutputWatermark);
-      // Set a timer to continue processing this element.
-      // todo radoslws@ decide if draining should be set on timer
-      timerInternals.setTimer(
+      // Set a timer to continue processing this element, but only when no 
drain
+      if(timer.causedByDrain() == CausedByDrain.NORMAL) {

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   The `timer` variable can be `null` on the initial processing call (the seed 
call), which will cause a `NullPointerException` here. The condition should be 
updated to handle the `null` case, for example: `if (timer == null || 
timer.causedByDrain() == CausedByDrain.NORMAL)`.
   
   ```suggestion
         if (timer == null || timer.causedByDrain() == CausedByDrain.NORMAL) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to