gemini-code-assist[bot] commented on code in PR #39150:
URL: https://github.com/apache/beam/pull/39150#discussion_r3491901073


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1129,11 +1188,13 @@ public PCollection<T> expand(PBegin input) {
       }
 
       if (getDeadLetterTopicProvider() != null
-          && !(getBadRecordRouter() instanceof ThrowingBadRecordRouter)) {
+          && (getBadRecordRouter() instanceof ThrowingBadRecordRouter)) {
         throw new IllegalArgumentException(
             "PubSubIO cannot be configured with both a dead letter topic and a 
bad record router");
       }

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   This change introduces a critical logic inversion bug. The original code 
threw an exception if a dead letter topic was configured AND a custom bad 
record router was used (i.e., not the default `ThrowingBadRecordRouter`). By 
changing `!(...)` to `(...)`, the code now throws an exception when a dead 
letter topic is configured with the DEFAULT `ThrowingBadRecordRouter`, which 
completely breaks the standard dead letter topic configuration for all users.
   
   ```suggestion
         if (getDeadLetterTopicProvider() != null
             && !(getBadRecordRouter() instanceof ThrowingBadRecordRouter)) {
           throw new IllegalArgumentException(
               "PubSubIO cannot be configured with both a dead letter topic and 
a bad record router");
         }
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1079,6 +1094,50 @@ public Read<T> withIdAttribute(String idAttribute) {
       return toBuilder().setIdAttribute(idAttribute).build();
     }
 
+    public Read<T> withEnableOpenTelemetryTracing() {
+      return 
toBuilder().setEnableOpenTelemetryTracing(true).setNeedsAttributes(true).build();
+    }
+
+    static class OpenTelemetryHeaderConsumer extends DoFn<PubsubMessage, 
PubsubMessage> {
+      Context extractSpanContext(PubsubMessage message) {
+        TextMapGetter<PubsubMessage> extractMessageAttributes =
+            new TextMapGetter<PubsubMessage>() {
+              @Override
+              public String get(@Nullable PubsubMessage carrier, String key) {
+                String attribute =
+                    
Preconditions.checkArgumentNotNull(carrier).getAttribute("googclient_" + key);
+                return attribute == null ? "" : attribute;
+              }
+
+              @Override
+              public Iterable<String> keys(PubsubMessage carrier) {
+                Map<String, String> attributeMap = carrier.getAttributeMap();
+                return attributeMap == null ? ImmutableList.of() : 
attributeMap.keySet();
+              }
+            };
+        return W3CTraceContextPropagator.getInstance()
+            .extract(Context.current(), message, extractMessageAttributes);
+      }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   There are three issues with the `TextMapGetter` implementation:
   1. If `carrier` is null, `get` will throw a `NullPointerException` due to 
`Preconditions.checkArgumentNotNull(carrier)`. According to the OpenTelemetry 
specification, `get` should return `null` if the carrier is null or the key is 
not found.
   2. Returning `""` (empty string) instead of `null` when the attribute is 
missing can cause the propagator to fail or log warnings because it tries to 
parse an empty string as a traceparent.
   3. `keys` returns the raw keys from the attribute map (e.g., 
`"googclient_traceparent"`). However, `get` prepends `"googclient_"` to the 
key. If a propagator iterates over the keys returned by `keys` and calls `get`, 
it will look up `"googclient_googclient_traceparent"`. `keys` should filter and 
strip the `"googclient_"` prefix to remain consistent with `get`.
   
   ```java
         Context extractSpanContext(PubsubMessage message) {
           TextMapGetter<PubsubMessage> extractMessageAttributes =
               new TextMapGetter<PubsubMessage>() {
                 @Override
                 public String get(@Nullable PubsubMessage carrier, String key) 
{
                   if (carrier == null) {
                     return null;
                   }
                   return carrier.getAttribute("googclient_" + key);
                 }
   
                 @Override
                 public Iterable<String> keys(PubsubMessage carrier) {
                   if (carrier == null) {
                     return ImmutableList.of();
                   }
                   Map<String, String> attributeMap = carrier.getAttributeMap();
                   if (attributeMap == null) {
                     return ImmutableList.of();
                   }
                   java.util.List<String> keys = new java.util.ArrayList<>();
                   for (String key : attributeMap.keySet()) {
                     if (key.startsWith("googclient_")) {
                       keys.add(key.substring("googclient_".length()));
                     }
                   }
                   return keys;
                 }
               };
           return W3CTraceContextPropagator.getInstance()
               .extract(Context.current(), message, extractMessageAttributes);
         }
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1439,6 +1513,39 @@ abstract Builder<T> setBadRecordErrorHandler(
       abstract Write<T> build();
     }
 
+    static class OpenTelemetryHeaderPropagator extends DoFn<PubsubMessage, 
PubsubMessage> {
+      void injectSpanContext(Map<String, String> attr) {
+        TextMapSetter<Map<String, String>> inject =
+            new TextMapSetter<Map<String, String>>() {
+              @Override
+              public void set(@Nullable Map<String, String> attr, String key, 
String value) {
+                Preconditions.checkArgumentNotNull(attr).put("googclient_" + 
key, value);
+              }
+            };

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using `Preconditions.checkArgumentNotNull(attr)` inside the `TextMapSetter` 
is risky because the setter's contract allows `attr` to be `@Nullable`. It is 
safer to perform a null check and skip injection if `attr` is null.
   
   ```java
         void injectSpanContext(Map<String, String> attr) {
           TextMapSetter<Map<String, String>> inject =
               new TextMapSetter<Map<String, String>>() {
                 @Override
                 public void set(@Nullable Map<String, String> attr, String 
key, String value) {
                   if (attr != null) {
                     attr.put("googclient_" + key, value);
                   }
                 }
               };
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1439,6 +1513,39 @@ abstract Builder<T> setBadRecordErrorHandler(
       abstract Write<T> build();
     }
 
+    static class OpenTelemetryHeaderPropagator extends DoFn<PubsubMessage, 
PubsubMessage> {
+      void injectSpanContext(Map<String, String> attr) {
+        TextMapSetter<Map<String, String>> inject =
+            new TextMapSetter<Map<String, String>>() {
+              @Override
+              public void set(@Nullable Map<String, String> attr, String key, 
String value) {
+                Preconditions.checkArgumentNotNull(attr).put("googclient_" + 
key, value);
+              }
+            };
+        W3CTraceContextPropagator.getInstance().inject(Context.current(), 
attr, inject);
+      }
+
+      @ProcessElement
+      public void processElement(
+          ProcessContext c,
+          @Element PubsubMessage message,
+          OutputReceiver<PubsubMessage> output,
+          PipelineOptions po) {
+        Map<String, String> attributeMap = message.getAttributeMap();
+        Map<String, String> attr = new 
HashMap<>(Objects.requireNonNull(attributeMap));
+        injectSpanContext(attr);

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   `message.getAttributeMap()` can return `null` if the PubsubMessage was 
constructed without attributes. Calling `Objects.requireNonNull(attributeMap)` 
will throw a `NullPointerException` at runtime. We should handle the null case 
gracefully by initializing an empty `HashMap`.
   
   ```suggestion
           Map<String, String> attributeMap = message.getAttributeMap();
           Map<String, String> attr =
               attributeMap == null ? new HashMap<>() : new 
HashMap<>(attributeMap);
           injectSpanContext(attr);
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1079,6 +1094,50 @@ public Read<T> withIdAttribute(String idAttribute) {
       return toBuilder().setIdAttribute(idAttribute).build();
     }
 
+    public Read<T> withEnableOpenTelemetryTracing() {
+      return 
toBuilder().setEnableOpenTelemetryTracing(true).setNeedsAttributes(true).build();
+    }
+
+    static class OpenTelemetryHeaderConsumer extends DoFn<PubsubMessage, 
PubsubMessage> {
+      Context extractSpanContext(PubsubMessage message) {
+        TextMapGetter<PubsubMessage> extractMessageAttributes =
+            new TextMapGetter<PubsubMessage>() {
+              @Override
+              public String get(@Nullable PubsubMessage carrier, String key) {
+                String attribute =
+                    
Preconditions.checkArgumentNotNull(carrier).getAttribute("googclient_" + key);
+                return attribute == null ? "" : attribute;
+              }
+
+              @Override
+              public Iterable<String> keys(PubsubMessage carrier) {
+                Map<String, String> attributeMap = carrier.getAttributeMap();
+                return attributeMap == null ? ImmutableList.of() : 
attributeMap.keySet();
+              }
+            };
+        return W3CTraceContextPropagator.getInstance()
+            .extract(Context.current(), message, extractMessageAttributes);
+      }
+
+      @ProcessElement
+      public void processElement(
+          ProcessContext c,
+          @Element PubsubMessage message,
+          OutputReceiver<PubsubMessage> output,
+          PipelineOptions po) {
+        Context context = extractSpanContext(message);
+        TracerProvider tracerProvider =
+            po.as(OpenTelemetryTracingOptions.class).getTracerProvider();
+        Tracer tracer = 
Preconditions.checkArgumentNotNull(tracerProvider).get("PubSubIO");
+        Span psSub = tracer.spanBuilder("Process 
Element").setParent(context).startSpan();
+        try (Scope s = context.makeCurrent()) {
+          output.output(message);
+        } finally {

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The scope is being created using `context.makeCurrent()` instead of 
`psSub.makeCurrent()`. This means the newly created span `psSub` is never made 
current, and any downstream operations or nested spans created during 
`output.output(message)` will not be children of `psSub` ("Process Element"). 
Instead, they will be direct children of the extracted parent `context`.
   
   ```suggestion
           try (Scope s = psSub.makeCurrent()) {
             output.output(message);
           } finally {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to