gemini-code-assist[bot] commented on code in PR #38814:
URL: https://github.com/apache/beam/pull/38814#discussion_r3355547470
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -240,54 +280,120 @@ public byte[] getCurrentRecordOffset() {
return activeReader.getCurrentRecordOffset();
}
+ public void clear() {
+ for (Work w : executedWorks) {
+ w.setOnFailureListener(null);
+ }
+ this.executedWorks = new ArrayList<>();
+ this.outputBuilders = new ArrayList<>();
+ this.accumulatedCallbacks = new HashMap<>();
+ this.workIsFailed = false;
+ this.sideInputCache.clear();
+ this.activeStateReader = null;
+ this.activeReader = null;
+ this.keyCoder = null;
+ this.workExecutor = null;
+ this.workQueueExecutor = null;
+ this.budgetHandle = null;
+ this.keySwitchListener = null;
+ }
Review Comment:

The `clear()` method resets the execution context but does not clear the
references to `this.key` and `this.work`. Since `StreamingModeExecutionContext`
instances are often cached or pooled (e.g., in `ComputationState`), holding
onto these references after execution can lead to significant memory leaks of
large `Work` and user key objects.
Please set `this.key` and `this.work` to `null` in the `clear()` method to
allow them to be garbage collected.
```java
public void clear() {
for (Work w : executedWorks) {
w.setOnFailureListener(null);
}
this.executedWorks = new ArrayList<>();
this.outputBuilders = new ArrayList<>();
this.accumulatedCallbacks = new HashMap<>();
this.workIsFailed = false;
this.sideInputCache.clear();
this.activeStateReader = null;
this.activeReader = null;
this.keyCoder = null;
this.workExecutor = null;
this.workQueueExecutor = null;
this.budgetHandle = null;
this.keySwitchListener = null;
this.key = null;
this.work = null;
}
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -235,8 +241,19 @@ public void setProcessingThreadName(String
processingThreadName) {
}
@Override
- public void setFailed() {
+ public synchronized void setFailed() {
this.isFailed = true;
+ Runnable listener = onFailureListener;
+ if (listener != null) {
+ listener.run();
+ }
+ }
+
+ public synchronized void setOnFailureListener(@Nullable Runnable listener) {
+ this.onFailureListener = listener;
+ if (isFailed && listener != null) {
+ listener.run();
+ }
}
Review Comment:

Calling an external callback (`listener.run()`) while holding a monitor lock
(`synchronized`) is a known concurrency anti-pattern (often referred to as
calling an "alien method" under lock). If the listener implementation or any
downstream call attempts to acquire another lock or call back into `Work`'s
synchronized methods, it can easily lead to deadlocks.
To prevent this, retrieve the listener under the lock and execute it outside
the synchronized block.
```java
@Override
public void setFailed() {
Runnable listener;
synchronized (this) {
this.isFailed = true;
listener = onFailureListener;
}
if (listener != null) {
listener.run();
}
}
public void setOnFailureListener(@Nullable Runnable listener) {
boolean alreadyFailed;
synchronized (this) {
this.onFailureListener = listener;
alreadyFailed = isFailed;
}
if (alreadyFailed && listener != null) {
listener.run();
}
}
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -441,20 +547,22 @@ public void setActiveReader(UnboundedReader<?> reader) {
/** Invalidate the state and reader caches for this computation and key. */
public void invalidateCache() {
- ByteString key = getSerializedKey();
- if (key != null) {
- readerCache.invalidateReader(getComputationKey());
- if (activeReader != null) {
- try {
- activeReader.close();
- } catch (IOException e) {
- LOG.warn(
- "Failed to close reader for {}-{}", computationId,
getWorkItem().getShardingKey(), e);
- }
+ for (Work w : executedWorks) {
+ WindmillComputationKey compKey =
+ WindmillComputationKey.create(computationId, w.getShardedKey());
+ readerCache.invalidateReader(compKey);
+ stateCache.invalidate(w.getShardedKey());
+ }
+ if (activeReader != null) {
+ try {
+ activeReader.close();
+ } catch (IOException e) {
+ LOG.warn(
+ "Failed to close reader for {}-{}", computationId,
getWorkItem().getShardingKey(), e);
}
Review Comment:

If `getWorkItem()` returns `null` (which is possible if the context is
cleared or not yet initialized), calling `getWorkItem().getShardingKey()`
inside the `catch` block will throw a `NullPointerException`. This will mask
the original `IOException` thrown by `activeReader.close()`, making debugging
very difficult.
Please add a null check for `getWorkItem()` before accessing the sharding
key.
```java
} catch (IOException e) {
Windmill.WorkItem workItem = getWorkItem();
long shardingKey = workItem != null ? workItem.getShardingKey() :
-1L;
LOG.warn(
"Failed to close reader for {}-{}", computationId, shardingKey,
e);
}
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -354,27 +330,35 @@ private Windmill.WorkItemCommitRequest
validateCommitRequestSize(
}
private void recordProcessingStats(
- Windmill.WorkItemCommitRequest.Builder outputBuilder,
- Windmill.WorkItem workItem,
- ExecuteWorkResult executeWorkResult) {
- // Compute shuffle and state byte statistics these will be flushed
asynchronously.
- long stateBytesWritten =
- outputBuilder
- .clearOutputMessages()
- .clearPerWorkItemLatencyAttributions()
- .build()
- .getSerializedSize();
-
-
streamingCounters.windmillShuffleBytesRead().addValue(computeShuffleBytesRead(workItem));
-
streamingCounters.windmillStateBytesRead().addValue(executeWorkResult.stateBytesRead());
- streamingCounters.windmillStateBytesWritten().addValue(stateBytesWritten);
+ List<Work> workBatch,
+ List<Windmill.WorkItemCommitRequest.Builder> outputBuilders,
+ long totalStateBytesRead) {
+ long totalStateBytesWritten = 0;
+ long totalShuffleBytesRead = 0;
+ for (int i = 0; i < workBatch.size(); i++) {
+ Windmill.WorkItem workItem = workBatch.get(i).getWorkItem();
+ Windmill.WorkItemCommitRequest.Builder outputBuilder =
outputBuilders.get(i);
+ // Compute shuffle and state byte statistics these will be flushed
asynchronously.
+ long stateBytesWritten =
+ outputBuilder
+ .clearOutputMessages()
+ .clearPerWorkItemLatencyAttributions()
+ .build()
+ .getSerializedSize();
Review Comment:

Mutating the shared `outputBuilder` by calling `clearOutputMessages()` and
`clearPerWorkItemLatencyAttributions()` inside `recordProcessingStats`
introduces side-effects. If the order of execution in `processWork` is ever
changed, or if the builders are reused/inspected later, this will silently
discard the output messages and latency attributions.
To make this safer and more robust, clone the builder before clearing fields
to compute the size.
```suggestion
long stateBytesWritten =
outputBuilder
.clone()
.clearOutputMessages()
.clearPerWorkItemLatencyAttributions()
.build()
.getSerializedSize();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]