gemini-code-assist[bot] commented on code in PR #38991:
URL: https://github.com/apache/beam/pull/38991#discussion_r3424543754
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java:
##########
@@ -116,13 +116,12 @@ public <KeyT> void onWindowExpiration(BoundedWindow
window, Instant timestamp, K
@VisibleForTesting
static class LateDataFilter {
private final WindowingStrategy<?, ?> windowingStrategy;
- private final TimerInternals timerInternals;
+ private final StepContext stepContext;
private final Counter droppedDueToLateness;
- public LateDataFilter(
- WindowingStrategy<?, ?> windowingStrategy, TimerInternals
timerInternals) {
+ public LateDataFilter(WindowingStrategy<?, ?> windowingStrategy,
StepContext stepContext) {
this.windowingStrategy = windowingStrategy;
- this.timerInternals = timerInternals;
+ this.stepContext = stepContext;
Review Comment:

To prevent potential `NullPointerException`s later during execution, add a
defensive null check for the newly introduced `stepContext` parameter in the
constructor.
```suggestion
public LateDataFilter(WindowingStrategy<?, ?> windowingStrategy,
StepContext stepContext) {
this.windowingStrategy = windowingStrategy;
this.stepContext = java.util.Objects.requireNonNull(stepContext,
"stepContext");
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java:
##########
@@ -55,6 +55,19 @@ public class WindmillStateInternals<K> implements
StateInternals {
private final CachingStateTable workItemDerivedState;
private final Supplier<Closeable> scopedReadStateSupplier;
+ private boolean poisoned = false;
Review Comment:

Declare the `poisoned` flag as `volatile` to guarantee proper visibility
across threads. While execution contexts in streaming are typically
single-threaded per key, state and timer internals might be accessed or cleared
across thread boundaries (e.g., during asynchronous committing or
multi-threaded processing), and `volatile` guarantees that updates to this flag
are immediately visible to all threads.
```suggestion
private volatile boolean poisoned = false;
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java:
##########
@@ -64,6 +64,19 @@ class WindmillTimerInternals implements TimerInternals {
private final Consumer<TimerData> onTimerModified;
private final WindmillTagEncoding windmillTagEncoding;
+ private boolean poisoned = false;
Review Comment:

Declare the `poisoned` flag as `volatile` to guarantee proper visibility
across threads. While execution contexts in streaming are typically
single-threaded per key, state and timer internals might be accessed or cleared
across thread boundaries (e.g., during asynchronous committing or
multi-threaded processing), and `volatile` guarantees that updates to this flag
are immediately visible to all threads.
```suggestion
private volatile boolean poisoned = false;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]