gemini-code-assist[bot] commented on code in PR #39152:
URL: https://github.com/apache/beam/pull/39152#discussion_r3491896353


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -200,7 +200,9 @@ public Instant getTimestamp() {
 
     @Override
     public @Nullable Context getOpenTelemetryContext() {
-      return openTelemetryContext;
+      // builder may have different context set at the beginning of parDo
+      // when building WindowedValue we should take current context from 
storage.
+      return openTelemetryContext != Context.current() ? Context.current() : 
openTelemetryContext;

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Dynamically evaluating `Context.current()` inside the getter 
`getOpenTelemetryContext()` is highly problematic and breaks context 
propagation across thread boundaries.
   
   `WindowedValue` is an immutable value representation of an element. If 
`getOpenTelemetryContext()` is called on a different thread (e.g., during 
asynchronous processing, buffering, or when writing to a sink on a background 
thread), `Context.current()` will return the context of that background thread 
(or `Context.root()`), which differs from `openTelemetryContext`. As a result, 
the getter will incorrectly return the background thread's context instead of 
the element's original context.
   
   To fix this correctness issue, the current OpenTelemetry context should be 
captured *at construction time* (e.g., in the constructor or builder of 
`WindowedValue`), and `getOpenTelemetryContext()` should simply return the 
captured field without any dynamic checks.
   
   ```suggestion
         return openTelemetryContext;
   ```



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java:
##########
@@ -184,12 +187,17 @@ public void startBundle() {
 
   @Override
   public void processElement(WindowedValue<InputT> compressedElem) {
-    if (observesWindow) {
-      for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) {
-        invokeProcessElement(elem);
+    Context openTelemetryContext = compressedElem.getOpenTelemetryContext();
+    Scope scope =
+        
Optional.ofNullable(openTelemetryContext).map(Context::makeCurrent).orElse(() 
-> {});
+    try (scope) {
+      if (observesWindow) {
+        for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) {
+          invokeProcessElement(elem);
+        }
+      } else {
+        invokeProcessElement(compressedElem);
       }
-    } else {
-      invokeProcessElement(compressedElem);
     }
   }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Using `Optional.ofNullable` and creating a dummy lambda `() -> {}` for the 
`Scope` on every single element invocation introduces significant garbage 
collection (GC) pressure and CPU overhead on the hot path. Since 
`processElement` is executed millions of times per second in high-throughput 
pipelines, we should avoid any object allocations and try-with-resources 
overhead when `openTelemetryContext` is null.
   
   Instead, we can check if `openTelemetryContext` is null first, and only 
enter the try-with-resources block when a context is actually present.
   
   ```java
       Context openTelemetryContext = compressedElem.getOpenTelemetryContext();
       if (openTelemetryContext == null) {
         processElementInternal(compressedElem);
       } else {
         try (Scope scope = openTelemetryContext.makeCurrent()) {
           processElementInternal(compressedElem);
         }
       }
     }
   
     private void processElementInternal(WindowedValue<InputT> compressedElem) {
       if (observesWindow) {
         for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) {
           invokeProcessElement(elem);
         }
       } else {
         invokeProcessElement(compressedElem);
       }
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to