gemini-code-assist[bot] commented on code in PR #39151:
URL: https://github.com/apache/beam/pull/39151#discussion_r3491885616


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -2218,6 +2254,76 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     }
   }
 
+  static class OpenTelemetryHeaderConsumer<K, V>
+      extends DoFn<KafkaRecord<K, V>, KafkaRecord<K, V>> {
+    @Nullable Tracer trace = null;
+
+    @Setup
+    public void setup(PipelineOptions options) {
+      // inject tracer via options
+      io.opentelemetry.api.OpenTelemetry openTelemetry =
+          options.as(SdkHarnessOptions.class).getOpenTelemetry();
+      if (openTelemetry != null) {
+        trace = openTelemetry.getTracer("KafkaIO");
+      }
+    }
+
+    Context extractSpanContext(KafkaRecord<K, V> message) {
+      TextMapGetter<KafkaRecord<K, V>> extractMessageAttributes =
+          new TextMapGetter<KafkaRecord<K, V>>() {
+
+            @Override
+            public @Nullable String get(@Nullable KafkaRecord<K, V> carrier, 
String key) {
+
+              Headers headers = 
Preconditions.checkArgumentNotNull(carrier).getHeaders();
+              if (headers == null) return null;
+              Header header = headers.lastHeader(key);
+              if (header == null) return null;
+              return new String(header.value(), UTF_8);
+            }
+
+            @Override
+            public Iterable<String> keys(KafkaRecord<K, V> carrier) {
+              if (carrier.getHeaders() == null) return ImmutableList.of();
+              return StreamSupport.stream(carrier.getHeaders().spliterator(), 
false)
+                  .map(Header::key)
+                  .collect(Collectors.toList());
+            }
+          };
+      return W3CTraceContextPropagator.getInstance()
+          .extract(Context.current(), message, extractMessageAttributes);
+    }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `TextMapGetter` implementation should be fully null-safe. The `carrier` 
parameter in `get` and `keys` is annotated as `@Nullable` (or can be null in 
some contexts), so dereferencing it directly without a null check can lead to 
`NullPointerException`s. Additionally, using 
`Preconditions.checkArgumentNotNull` will throw an exception on null, whereas 
the getter should gracefully return `null` or an empty list.
   
   ```java
       Context extractSpanContext(KafkaRecord<K, V> message) {
         TextMapGetter<KafkaRecord<K, V>> extractMessageAttributes =
             new TextMapGetter<KafkaRecord<K, V>>() {
   
               @Override
               public @Nullable String get(@Nullable KafkaRecord<K, V> carrier, 
String key) {
                 if (carrier == null) {
                   return null;
                 }
                 Headers headers = carrier.getHeaders();
                 if (headers == null) {
                   return null;
                 }
                 Header header = headers.lastHeader(key);
                 if (header == null) {
                   return null;
                 }
                 return new String(header.value(), UTF_8);
               }
   
               @Override
               public Iterable<String> keys(@Nullable KafkaRecord<K, V> 
carrier) {
                 if (carrier == null || carrier.getHeaders() == null) {
                   return ImmutableList.of();
                 }
                 return 
StreamSupport.stream(carrier.getHeaders().spliterator(), false)
                     .map(Header::key)
                     .collect(Collectors.toList());
               }
             };
         return W3CTraceContextPropagator.getInstance()
             .extract(Context.current(), message, extractMessageAttributes);
       }
   ```



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -2218,6 +2254,76 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     }
   }
 
+  static class OpenTelemetryHeaderConsumer<K, V>
+      extends DoFn<KafkaRecord<K, V>, KafkaRecord<K, V>> {
+    @Nullable Tracer trace = null;
+
+    @Setup
+    public void setup(PipelineOptions options) {
+      // inject tracer via options
+      io.opentelemetry.api.OpenTelemetry openTelemetry =
+          options.as(SdkHarnessOptions.class).getOpenTelemetry();
+      if (openTelemetry != null) {
+        trace = openTelemetry.getTracer("KafkaIO");
+      }
+    }
+
+    Context extractSpanContext(KafkaRecord<K, V> message) {
+      TextMapGetter<KafkaRecord<K, V>> extractMessageAttributes =
+          new TextMapGetter<KafkaRecord<K, V>>() {
+
+            @Override
+            public @Nullable String get(@Nullable KafkaRecord<K, V> carrier, 
String key) {
+
+              Headers headers = 
Preconditions.checkArgumentNotNull(carrier).getHeaders();
+              if (headers == null) return null;
+              Header header = headers.lastHeader(key);
+              if (header == null) return null;
+              return new String(header.value(), UTF_8);
+            }
+
+            @Override
+            public Iterable<String> keys(KafkaRecord<K, V> carrier) {
+              if (carrier.getHeaders() == null) return ImmutableList.of();
+              return StreamSupport.stream(carrier.getHeaders().spliterator(), 
false)
+                  .map(Header::key)
+                  .collect(Collectors.toList());
+            }
+          };
+      return W3CTraceContextPropagator.getInstance()
+          .extract(Context.current(), message, extractMessageAttributes);
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KafkaRecord<K, V> element, OutputReceiver<KafkaRecord<K, V>> 
receiver) {
+      Context context = extractSpanContext(element);
+      try (Scope ignored = context.makeCurrent()) {
+        receiver.output(element);
+      }
+    }
+  }
+
+  static class OpenTelemetryHeaderPropagator<K, V>
+      extends DoFn<ProducerRecord<K, V>, ProducerRecord<K, V>> {
+    void injectTraceContext(ProducerRecord<K, V> message) {
+      TextMapSetter<ProducerRecord<K, V>> injectMessageAttributes =
+          (carrier, key, value) -> {
+            Preconditions.checkArgumentNotNull(carrier);
+            Objects.requireNonNull(carrier.headers()).add(key, 
value.getBytes(UTF_8));
+          };
+      W3CTraceContextPropagator.getInstance()
+          .inject(Context.current(), message, injectMessageAttributes);
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element ProducerRecord<K, V> element, 
OutputReceiver<ProducerRecord<K, V>> receiver) {
+      injectTraceContext(element);
+      receiver.output(element);
+    }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   In Apache Beam, mutating input elements inside a `DoFn` is a violation of 
the programming model and can lead to severe correctness issues, especially if 
elements are retried or processed by multiple downstream steps. 
`ProducerRecord`'s headers should not be mutated in-place. Instead, copy the 
headers to a new `RecordHeaders` instance, inject the trace context, and return 
a new `ProducerRecord`.
   
   ```java
       ProducerRecord<K, V> injectTraceContext(ProducerRecord<K, V> message) {
         org.apache.kafka.common.header.internals.RecordHeaders headers =
             new 
org.apache.kafka.common.header.internals.RecordHeaders(message.headers());
         TextMapSetter<org.apache.kafka.common.header.internals.RecordHeaders> 
injectMessageAttributes =
             (carrier, key, value) -> {
               if (carrier != null) {
                 carrier.add(key, value.getBytes(UTF_8));
               }
             };
         W3CTraceContextPropagator.getInstance()
             .inject(Context.current(), headers, injectMessageAttributes);
         return new ProducerRecord<>(
             message.topic(),
             message.partition(),
             message.timestamp(),
             message.key(),
             message.value(),
             headers);
       }
   
       @ProcessElement
       public void processElement(
           @Element ProducerRecord<K, V> element, 
OutputReceiver<ProducerRecord<K, V>> receiver) {
         receiver.output(injectTraceContext(element));
       }
   ```



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -2218,6 +2254,76 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     }
   }
 
+  static class OpenTelemetryHeaderConsumer<K, V>
+      extends DoFn<KafkaRecord<K, V>, KafkaRecord<K, V>> {
+    @Nullable Tracer trace = null;
+
+    @Setup
+    public void setup(PipelineOptions options) {
+      // inject tracer via options
+      io.opentelemetry.api.OpenTelemetry openTelemetry =
+          options.as(SdkHarnessOptions.class).getOpenTelemetry();
+      if (openTelemetry != null) {
+        trace = openTelemetry.getTracer("KafkaIO");
+      }
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `trace` field and the `@Setup` method are unused in 
`OpenTelemetryHeaderConsumer`. Since context extraction relies on the global 
`W3CTraceContextPropagator` and does not use the `Tracer` instance, these can 
be safely removed to keep the code clean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to