gemini-code-assist[bot] commented on code in PR #39152:
URL: https://github.com/apache/beam/pull/39152#discussion_r3491896353
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -200,7 +200,9 @@ public Instant getTimestamp() {
@Override
public @Nullable Context getOpenTelemetryContext() {
- return openTelemetryContext;
+ // builder may have different context set at the beginning of parDo
+ // when building WindowedValue we should take current context from
storage.
+ return openTelemetryContext != Context.current() ? Context.current() :
openTelemetryContext;
Review Comment:

Dynamically evaluating `Context.current()` inside the getter
`getOpenTelemetryContext()` is highly problematic and breaks context
propagation across thread boundaries.
`WindowedValue` is an immutable value representation of an element. If
`getOpenTelemetryContext()` is called on a different thread (e.g., during
asynchronous processing, buffering, or when writing to a sink on a background
thread), `Context.current()` will return the context of that background thread
(or `Context.root()`), which differs from `openTelemetryContext`. As a result,
the getter will incorrectly return the background thread's context instead of
the element's original context.
To fix this correctness issue, the current OpenTelemetry context should be
captured *at construction time* (e.g., in the constructor or builder of
`WindowedValue`), and `getOpenTelemetryContext()` should simply return the
captured field without any dynamic checks.
```suggestion
return openTelemetryContext;
```
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java:
##########
@@ -184,12 +187,17 @@ public void startBundle() {
@Override
public void processElement(WindowedValue<InputT> compressedElem) {
- if (observesWindow) {
- for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) {
- invokeProcessElement(elem);
+ Context openTelemetryContext = compressedElem.getOpenTelemetryContext();
+ Scope scope =
+
Optional.ofNullable(openTelemetryContext).map(Context::makeCurrent).orElse(()
-> {});
+ try (scope) {
+ if (observesWindow) {
+ for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) {
+ invokeProcessElement(elem);
+ }
+ } else {
+ invokeProcessElement(compressedElem);
}
- } else {
- invokeProcessElement(compressedElem);
}
}
Review Comment:

Using `Optional.ofNullable` and creating a dummy lambda `() -> {}` for the
`Scope` on every single element invocation introduces significant garbage
collection (GC) pressure and CPU overhead on the hot path. Since
`processElement` is executed millions of times per second in high-throughput
pipelines, we should avoid any object allocations and try-with-resources
overhead when `openTelemetryContext` is null.
Instead, we can check if `openTelemetryContext` is null first, and only
enter the try-with-resources block when a context is actually present.
```java
Context openTelemetryContext = compressedElem.getOpenTelemetryContext();
if (openTelemetryContext == null) {
processElementInternal(compressedElem);
} else {
try (Scope scope = openTelemetryContext.makeCurrent()) {
processElementInternal(compressedElem);
}
}
}
private void processElementInternal(WindowedValue<InputT> compressedElem) {
if (observesWindow) {
for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) {
invokeProcessElement(elem);
}
} else {
invokeProcessElement(compressedElem);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]