gemini-code-assist[bot] commented on code in PR #38991:
URL: https://github.com/apache/beam/pull/38991#discussion_r3424543754


##########
runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java:
##########
@@ -116,13 +116,12 @@ public <KeyT> void onWindowExpiration(BoundedWindow 
window, Instant timestamp, K
   @VisibleForTesting
   static class LateDataFilter {
     private final WindowingStrategy<?, ?> windowingStrategy;
-    private final TimerInternals timerInternals;
+    private final StepContext stepContext;
     private final Counter droppedDueToLateness;
 
-    public LateDataFilter(
-        WindowingStrategy<?, ?> windowingStrategy, TimerInternals 
timerInternals) {
+    public LateDataFilter(WindowingStrategy<?, ?> windowingStrategy, 
StepContext stepContext) {
       this.windowingStrategy = windowingStrategy;
-      this.timerInternals = timerInternals;
+      this.stepContext = stepContext;

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To prevent potential `NullPointerException`s later during execution, add a 
defensive null check for the newly introduced `stepContext` parameter in the 
constructor.
   
   ```suggestion
       public LateDataFilter(WindowingStrategy<?, ?> windowingStrategy, 
StepContext stepContext) {
         this.windowingStrategy = windowingStrategy;
         this.stepContext = java.util.Objects.requireNonNull(stepContext, 
"stepContext");
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java:
##########
@@ -55,6 +55,19 @@ public class WindmillStateInternals<K> implements 
StateInternals {
   private final CachingStateTable workItemDerivedState;
   private final Supplier<Closeable> scopedReadStateSupplier;
 
+  private boolean poisoned = false;

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Declare the `poisoned` flag as `volatile` to guarantee proper visibility 
across threads. While execution contexts in streaming are typically 
single-threaded per key, state and timer internals might be accessed or cleared 
across thread boundaries (e.g., during asynchronous committing or 
multi-threaded processing), and `volatile` guarantees that updates to this flag 
are immediately visible to all threads.
   
   ```suggestion
     private volatile boolean poisoned = false;
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java:
##########
@@ -64,6 +64,19 @@ class WindmillTimerInternals implements TimerInternals {
   private final Consumer<TimerData> onTimerModified;
   private final WindmillTagEncoding windmillTagEncoding;
 
+  private boolean poisoned = false;

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Declare the `poisoned` flag as `volatile` to guarantee proper visibility 
across threads. While execution contexts in streaming are typically 
single-threaded per key, state and timer internals might be accessed or cleared 
across thread boundaries (e.g., during asynchronous committing or 
multi-threaded processing), and `volatile` guarantees that updates to this flag 
are immediately visible to all threads.
   
   ```suggestion
     private volatile boolean poisoned = false;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to