gemini-code-assist[bot] commented on code in PR #38988:
URL: https://github.com/apache/beam/pull/38988#discussion_r3460988199
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java:
##########
@@ -83,13 +97,19 @@ public <KeyT> void onTimer(
@Override
public <KeyT extends @Nullable Object> void finishKey(KeyT key) {
+ if (!activeKey) {
+ // This means that there were no elements for this key. Try to unblock
any queued elements.
+ onNewKey();
+ }
+
Preconditions.checkStateNotNull(sideInputProcessor).handleFinishKeyOrBundle();
simpleDoFnRunner.finishKey(key);
+ this.activeKey = false;
}
@Override
public void finishBundle() {
simpleDoFnRunner.finishBundle();
- sideInputProcessor.handleFinishBundle();
+ this.activeKey = false;
}
Review Comment:

To prevent memory leaks, `sideInputProcessor` should be set to `null` at the
end of `finishKey` and `finishBundle`. Otherwise, the runner will hold onto the
`StreamingSideInputProcessor` (and its associated fetcher and state) of the
last processed key indefinitely. Additionally, the `onTimer` method (not shown
in the diff but present in the class) also needs to call `onNewKey()` if
`!activeKey` to ensure the key context is initialized and any blocked elements
are processed before the timer runs.
```suggestion
@Override
public <KeyT extends @Nullable Object> void finishKey(KeyT key) {
if (!activeKey) {
// This means that there were no elements for this key. Try to unblock
any queued elements.
onNewKey();
}
Preconditions.checkStateNotNull(sideInputProcessor).handleFinishKeyOrBundle();
simpleDoFnRunner.finishKey(key);
this.activeKey = false;
this.sideInputProcessor = null;
}
@Override
public void finishBundle() {
simpleDoFnRunner.finishBundle();
this.activeKey = false;
this.sideInputProcessor = null;
}
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpers.java:
##########
@@ -293,23 +330,24 @@ public abstract void processTimer(
TimerInternals.TimerData timer,
Supplier<StreamingSideInputProcessor<?, ?>> sideInputProcessor)
throws Exception;
- };
+ }
void processTimers(
TimerType mode,
DataflowExecutionContext.DataflowStepContext context,
Coder<BoundedWindow> windowCoder,
- Runnable startKey,
Supplier<StreamingSideInputProcessor<?, ?>> sideInputProcessor)
throws Exception {
TimerInternals.TimerData timer = context.getNextFiredTimer(windowCoder);
-
if (timer != null && fnRunner == null) {
// If we need to run reallyStartBundle in here, we need to make sure to
switch the state
// sampler into the start state.
try (Closeable start = operationContext.enterStart()) {
reallyStartBundle();
- startKey.run();
+ }
+
+ if (!activeKey) {
+ this.onStartKey.accept((K) context.stateInternals().getKey());
}
}
Review Comment:

The `if (!activeKey)` check and `onStartKey.accept(...)` call are nested
inside the `if (timer != null && fnRunner == null)` block. However, if
`fnRunner` is already initialized (which is the case when `reallyStartBundle`
is called in `startBundle`), `fnRunner == null` is false, so `onStartKey` is
never called even if `activeKey` is false. This means the side input processor
won't be initialized for keys that only have timers, leading to potential NPEs
or incorrect behavior. We should move the `if (!activeKey)` block outside of
the `fnRunner == null` check, but still within the `timer != null` check.
```java
if (timer != null) {
if (fnRunner == null) {
// If we need to run reallyStartBundle in here, we need to make sure
to switch the state
// sampler into the start state.
try (Closeable start = operationContext.enterStart()) {
reallyStartBundle();
}
}
if (!activeKey) {
this.onStartKey.accept((K) context.stateInternals().getKey());
}
}
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java:
##########
@@ -183,18 +156,18 @@ public void processTimers() throws Exception {
SimpleParDoFnHelpers.TimerType.FAIL_USER,
helpers.userStepContext,
windowCoder,
- this::onStartKey,
() -> sideInputProcessor);
helpers.processTimers(
SimpleParDoFnHelpers.TimerType.SYSTEM,
helpers.stepContext,
windowCoder,
- this::onStartKey,
() -> sideInputProcessor);
}
@Override
- public void finishKey(Object key) throws Exception {}
+ public void finishKey(Object key) throws Exception {
+ helpers.finishKey((K) key, sideInputProcessor);
+ }
Review Comment:

To prevent memory leaks, `sideInputProcessor` should be set to `null` after
`helpers.finishKey` is called, so that the
`StreamingKeyedWorkItemSideInputParDoFn` instance does not hold onto the stale
processor of the last processed key.
```suggestion
@Override
public void finishKey(Object key) throws Exception {
helpers.finishKey((K) key, sideInputProcessor);
this.sideInputProcessor = null;
}
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java:
##########
@@ -185,18 +171,18 @@ public void processTimers() throws Exception {
SimpleParDoFnHelpers.TimerType.USER,
helpers.userStepContext,
windowCoder,
- this::onStartKey,
() -> sideInputProcessor);
helpers.processTimers(
SimpleParDoFnHelpers.TimerType.SYSTEM,
helpers.stepContext,
windowCoder,
- this::onStartKey,
() -> sideInputProcessor);
}
@Override
- public void finishKey(Object key) throws Exception {}
+ public void finishKey(Object key) throws Exception {
+ helpers.finishKey(key, sideInputProcessor);
+ }
Review Comment:

To prevent memory leaks, `sideInputProcessor` should be set to `null` after
`helpers.finishKey` is called, so that the `SimpleParDoFn` instance does not
hold onto the stale processor of the last processed key.
```suggestion
@Override
public void finishKey(Object key) throws Exception {
helpers.finishKey(key, sideInputProcessor);
this.sideInputProcessor = null;
}
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartialGroupByKeyParDoFns.java:
##########
@@ -378,15 +395,28 @@ public void processElement(Object elem) throws Exception {
}
@Override
- public void processTimers() {}
+ public void processTimers() throws Exception {
+ if (!activeKey) {
+ onStartKey();
+ }
+ }
@Override
- public void finishKey(Object key) throws Exception {}
+ public void finishKey(Object key) throws Exception {
+ if (!activeKey) {
+ onStartKey();
+ }
+ sideInputFetcher.persist();
+ sideInputFetcher = null;
+ this.activeKey = false;
+ }
@Override
public void finishBundle() throws Exception {
groupingTable.flush(receiver);
- sideInputFetcher.persist();
+ if (sideInputFetcher != null) {
+ sideInputFetcher.persist();
+ }
}
Review Comment:

To prevent memory leaks, `sideInputFetcher` should be set to `null` in
`finishBundle` if it is not null.
```suggestion
@Override
public void finishBundle() throws Exception {
groupingTable.flush(receiver);
if (sideInputFetcher != null) {
sideInputFetcher.persist();
this.sideInputFetcher = null;
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]