gemini-code-assist[bot] commented on code in PR #38863:
URL: https://github.com/apache/beam/pull/38863#discussion_r3403767505
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java:
##########
@@ -166,7 +191,7 @@ private void flushInternal() {
}
Elements.Builder elements = convertBufferForTransmission();
if (elements.getDataCount() > 0 || elements.getTimersCount() > 0) {
- outboundObserver.onNext(elements.build());
+ checkNotNull(outboundObserver).onNext(elements.build());
Review Comment:

Since `outboundObserver` is now mutable and set to `null` when an
instruction finishes, the background flushing thread can encounter a `null`
observer if a flush occurs after `finishInstruction()` is called. Instead of
throwing a `NullPointerException` via `checkNotNull`, we should defensively
check if the observer is non-null before calling `onNext`.
```java
StreamObserver<Elements> observer = outboundObserver;
if (observer != null) {
observer.onNext(elements.build());
}
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java:
##########
@@ -207,30 +233,29 @@ public Elements
sendOrCollectBufferedDataAndFinishOutboundStreams() {
TimerEndpoint timerKey = entry.getKey();
bufferedElements
.addTimersBuilder()
- .setInstructionId(processBundleRequestIdSupplier.get())
+ .setInstructionId(instructionId)
.setTransformId(timerKey.pTransformId)
.setTimerFamilyId(timerKey.timerFamilyId)
.setIsLast(true);
entry.getValue().resetStats();
}
+ // This is the end of the bundle so we reset state to prepare for future
bundles.
if (collectElementsIfNoFlushes && !hasFlushedForBundle) {
return bufferedElements.build();
}
- outboundObserver.onNext(bufferedElements.build());
- // This is now at the end of a bundle, so we reset hasFlushedForBundle to
prepare for new
- // bundles.
+ checkNotNull(outboundObserver).onNext(bufferedElements.build());
hasFlushedForBundle = false;
return null;
Review Comment:

Since `BeamFnDataOutboundAggregator` is now reused across multiple bundles,
we must reset `bytesWrittenSinceFlush` to `0` at the end of each bundle.
Otherwise, `bytesWrittenSinceFlush` will remain non-zero, causing the
background flushing thread to attempt a flush after the instruction has
finished, which leads to `NullPointerException` or other unexpected behaviors.
```java
// This is the end of the bundle so we reset state to prepare for future
bundles.
bytesWrittenSinceFlush = 0L;
if (collectElementsIfNoFlushes && !hasFlushedForBundle) {
return bufferedElements.build();
}
checkNotNull(outboundObserver).onNext(bufferedElements.build());
hasFlushedForBundle = false;
return null;
```
##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -1295,6 +1316,11 @@ message LogControl {}
service BeamFnLogging {
// Allows for the SDK to emit log entries which the runner can
// associate with the active job.
+ //
+ // Used to get/append/clear state stored by the runner on behalf of the SDK.
+ //
+ // Header metadata has the specified keys pairs:
+ // - "worker_id": the id of the sdk
Review Comment:

The comment block contains copy-pasted text from the `BeamFnState` service
("Used to get/append/clear state...") which is incorrect for `BeamFnLogging`.
Additionally, there is a typo "specified keys pairs" which should be "specified
key-value pairs".
```suggestion
// Header metadata has the specified key-value pairs:
// - "worker_id": the id of the sdk
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java:
##########
@@ -138,34 +137,42 @@ private static void removeKeyRecursively(JsonNode node,
String keyToRemove) {
}
public static void main(String[] args) throws Exception {
- main(System::getenv);
+ Function<String, @Nullable String> environmentVarGetter = System::getenv;
+ main(environmentVarGetter);
}
@VisibleForTesting
- public static void main(Function<String, String> environmentVarGetter)
throws Exception {
+ public static void main(Function<String, @Nullable String>
environmentVarGetter)
+ throws Exception {
JvmInitializers.runOnStartup();
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
-
getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
+ getApiServiceDescriptor(
+ checkNotNull(
+ environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR),
+ "LOGGING_API_SERVICE_DESCRIPTOR env var be set."));
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
-
getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
+ getApiServiceDescriptor(
+ checkNotNull(
+ environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR),
+ "CONTROL_API_SERVICE_DESCRIPTOR env var be set."));
+
+ @Nullable String envVar =
environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR);
Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
- environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null
- ? null
- :
getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
- String id = environmentVarGetter.apply(HARNESS_ID);
+ (envVar == null) ? null : getApiServiceDescriptor(envVar);
+ String id = checkNotNull(environmentVarGetter.apply(HARNESS_ID),
"HARNESS_ID env var be set.");
Review Comment:

The error messages in `checkNotNull` are missing the word "must" (e.g.,
`"LOGGING_API_SERVICE_DESCRIPTOR env var be set."`). They should be updated to
`"... env var must be set."` for grammatical correctness and clarity.
```suggestion
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
getApiServiceDescriptor(
checkNotNull(
environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR),
"LOGGING_API_SERVICE_DESCRIPTOR env var must be set."));
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
getApiServiceDescriptor(
checkNotNull(
environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR),
"CONTROL_API_SERVICE_DESCRIPTOR env var must be set."));
@Nullable String envVar =
environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR);
Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
(envVar == null) ? null : getApiServiceDescriptor(envVar);
String id =
checkNotNull(
environmentVarGetter.apply(HARNESS_ID), "HARNESS_ID env var must
be set.");
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java:
##########
@@ -44,17 +46,42 @@ public class BeamFnDataGrpcClient implements
BeamFnDataClient {
private static final Logger LOG =
LoggerFactory.getLogger(BeamFnDataGrpcClient.class);
- private final ConcurrentMap<Endpoints.ApiServiceDescriptor,
BeamFnDataGrpcMultiplexer>
- multiplexerCache;
+ private static class MultiplexerKey {
+ private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
+ private final String dataStreamId;
+
+ private MultiplexerKey(
+ Endpoints.ApiServiceDescriptor apiServiceDescriptor, String
dataStreamId) {
+ this.apiServiceDescriptor = apiServiceDescriptor;
+ this.dataStreamId = dataStreamId;
+ }
Review Comment:

To prevent creating duplicate `BeamFnDataGrpcMultiplexer` instances and
separate gRPC connections for `null` and empty (`""`) stream IDs, we should
normalize `dataStreamId` to a consistent value (e.g., empty string) in the
`MultiplexerKey` constructor.
```suggestion
private MultiplexerKey(
Endpoints.ApiServiceDescriptor apiServiceDescriptor, @Nullable
String dataStreamId) {
this.apiServiceDescriptor = apiServiceDescriptor;
this.dataStreamId = dataStreamId == null ? "" : dataStreamId;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]