gemini-code-assist[bot] commented on code in PR #38863:
URL: https://github.com/apache/beam/pull/38863#discussion_r3412187796
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java:
##########
@@ -166,7 +191,7 @@ private void flushInternal() {
}
Elements.Builder elements = convertBufferForTransmission();
if (elements.getDataCount() > 0 || elements.getTimersCount() > 0) {
- outboundObserver.onNext(elements.build());
+ checkNotNull(outboundObserver).onNext(elements.build());
}
hasFlushedForBundle = true;
Review Comment:

If `discard()` or `finishInstruction()` is called, `instructionId` and
`outboundObserver` are set to `null`. If the periodic flushing thread runs
concurrently or is scheduled to run just after, `flushInternal()` will be
called. It will attempt to call `convertBufferForTransmission()` (which calls
`setInstructionId(instructionId)`, throwing NPE if `instructionId` is null) or
`checkNotNull(outboundObserver)`, throwing NPE. Adding an early return if
either is null prevents these exceptions.
```java
if (instructionId == null || outboundObserver == null) {
return;
}
Elements.Builder elements = convertBufferForTransmission();
if (elements.getDataCount() > 0 || elements.getTimersCount() > 0) {
outboundObserver.onNext(elements.build());
}
hasFlushedForBundle = true;
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java:
##########
@@ -44,17 +46,42 @@ public class BeamFnDataGrpcClient implements
BeamFnDataClient {
private static final Logger LOG =
LoggerFactory.getLogger(BeamFnDataGrpcClient.class);
- private final ConcurrentMap<Endpoints.ApiServiceDescriptor,
BeamFnDataGrpcMultiplexer>
- multiplexerCache;
+ private static class MultiplexerKey {
+ private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
+ private final String dataStreamId;
+
+ private MultiplexerKey(
+ Endpoints.ApiServiceDescriptor apiServiceDescriptor, String
dataStreamId) {
+ this.apiServiceDescriptor = apiServiceDescriptor;
+ this.dataStreamId = dataStreamId;
+ }
Review Comment:

In `MultiplexerKey`, `dataStreamId` is compared using `Objects.equals`. If
one caller passes `null` and another passes `""` (empty string), they will be
treated as different keys, resulting in duplicate multiplexers for the same
default stream. Normalizing `dataStreamId` to `""` if it is null or empty
ensures consistent key lookup.
```suggestion
private MultiplexerKey(
Endpoints.ApiServiceDescriptor apiServiceDescriptor, @Nullable
String dataStreamId) {
this.apiServiceDescriptor = apiServiceDescriptor;
this.dataStreamId = dataStreamId == null ? "" : dataStreamId;
}
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java:
##########
@@ -179,12 +187,13 @@ public static void main(Function<String, String>
environmentVarGetter) throws Ex
} catch (Exception e) {
System.out.format("Problem loading pipeline options from file: %s%n",
e.getMessage());
}
-
- System.out.format("Pipeline options %s%n", pipelineOptionsJson);
- // TODO: https://github.com/apache/beam/issues/30301
- pipelineOptionsJson = removeNestedKey(pipelineOptionsJson,
"impersonateServiceAccount");
-
- PipelineOptions options =
PipelineOptionsTranslation.fromJson(pipelineOptionsJson);
+ if (pipelineOptionsJson != null) {
+ System.out.format("Pipeline options %s%n", pipelineOptionsJson);
+ // TODO: https://github.com/apache/beam/issues/30301
+ pipelineOptionsJson = removeNestedKey(pipelineOptionsJson,
"impersonateServiceAccount");
+ }
+ PipelineOptions options =
+ PipelineOptionsTranslation.fromJson(pipelineOptionsJson == null ? "" :
pipelineOptionsJson);
Review Comment:

If `pipelineOptionsJson` is null, passing `""` (empty string) to
`PipelineOptionsTranslation.fromJson` will likely throw a JSON parsing
exception because an empty string is not valid JSON. Passing `"{}"` (empty JSON
object) is the standard way to represent empty options and is safe to parse.
```suggestion
PipelineOptions options =
PipelineOptionsTranslation.fromJson(pipelineOptionsJson == null ?
"{}" : pipelineOptionsJson);
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java:
##########
@@ -207,34 +233,43 @@ public Elements
sendOrCollectBufferedDataAndFinishOutboundStreams() {
TimerEndpoint timerKey = entry.getKey();
bufferedElements
.addTimersBuilder()
- .setInstructionId(processBundleRequestIdSupplier.get())
+ .setInstructionId(instructionId)
.setTransformId(timerKey.pTransformId)
.setTimerFamilyId(timerKey.timerFamilyId)
.setIsLast(true);
entry.getValue().resetStats();
}
+ // This is the end of the bundle so we reset state to prepare for future
bundles.
if (collectElementsIfNoFlushes && !hasFlushedForBundle) {
return bufferedElements.build();
}
- outboundObserver.onNext(bufferedElements.build());
- // This is now at the end of a bundle, so we reset hasFlushedForBundle to
prepare for new
- // bundles.
+ checkNotNull(outboundObserver).onNext(bufferedElements.build());
hasFlushedForBundle = false;
return null;
}
// Send the elements to the StreamObserver associated with this aggregator.
public void sendElements(Elements elements) {
- outboundObserver.onNext(elements);
+ checkNotNull(outboundObserver).onNext(elements);
}
Review Comment:

`sendElements` is called by the bundle processing thread and writes directly
to `outboundObserver` without acquiring `flushLock`. If `timeLimit > 0`, the
background flushing thread can concurrently call `flushInternal()` and write to
the same `outboundObserver`. Since gRPC's `StreamObserver` is not thread-safe,
concurrent calls to `onNext` can cause stream corruption or crashes.
Synchronizing on `flushLock` when `timeLimit > 0` prevents this race condition.
```java
public void sendElements(Elements elements) {
if (timeLimit > 0) {
synchronized (flushLock) {
checkNotNull(outboundObserver).onNext(elements);
}
} else {
checkNotNull(outboundObserver).onNext(elements);
}
}
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java:
##########
@@ -91,25 +119,32 @@ public void poisonInstructionId(String instructionId) {
}
@Override
- public BeamFnDataOutboundAggregator createOutboundAggregator(
- ApiServiceDescriptor apiServiceDescriptor,
- Supplier<String> processBundleRequestIdSupplier,
- boolean collectElementsIfNoFlushes) {
- return new BeamFnDataOutboundAggregator(
- options,
- processBundleRequestIdSupplier,
- getClientFor(apiServiceDescriptor).getOutboundObserver(),
- collectElementsIfNoFlushes);
+ public StreamObserver<Elements> getOutboundObserver(
+ ApiServiceDescriptor apiServiceDescriptor, String dataStreamId) {
+ return getMultiplexer(apiServiceDescriptor,
dataStreamId).getOutboundObserver();
}
- private BeamFnDataGrpcMultiplexer getClientFor(
- Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
+ private BeamFnDataGrpcMultiplexer getMultiplexer(
+ Endpoints.ApiServiceDescriptor apiServiceDescriptor, String
dataStreamId) {
+ MultiplexerKey key = new MultiplexerKey(apiServiceDescriptor,
dataStreamId);
return multiplexerCache.computeIfAbsent(
- apiServiceDescriptor,
- (Endpoints.ApiServiceDescriptor descriptor) ->
- new BeamFnDataGrpcMultiplexer(
- descriptor,
- outboundObserverFactory,
-
BeamFnDataGrpc.newStub(channelFactory.apply(apiServiceDescriptor))::data));
+ key,
+ k -> {
+ OutboundObserverFactory.BasicFactory<Elements, Elements>
baseOutboundObserverFactory =
+ inboundObserver -> {
+ BeamFnDataGrpc.BeamFnDataStub stub =
+
BeamFnDataGrpc.newStub(channelFactory.apply(apiServiceDescriptor));
+ if (dataStreamId != null && !dataStreamId.isEmpty()) {
+ Metadata headers = new Metadata();
+ headers.put(
+ Metadata.Key.of("data_stream_id",
Metadata.ASCII_STRING_MARSHALLER),
+ dataStreamId);
+ stub =
stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
+ }
Review Comment:

With `dataStreamId` normalized in `MultiplexerKey`, we can simplify the
check in `getMultiplexer` to just `!key.dataStreamId.isEmpty()`, removing the
null check and using the normalized key field.
```suggestion
if (!key.dataStreamId.isEmpty()) {
Metadata headers = new Metadata();
headers.put(
Metadata.Key.of("data_stream_id",
Metadata.ASCII_STRING_MARSHALLER),
key.dataStreamId);
stub =
stub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]