gemini-code-assist[bot] commented on code in PR #39151:
URL: https://github.com/apache/beam/pull/39151#discussion_r3491885616
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -2218,6 +2254,76 @@ public void populateDisplayData(DisplayData.Builder
builder) {
}
}
+ static class OpenTelemetryHeaderConsumer<K, V>
+ extends DoFn<KafkaRecord<K, V>, KafkaRecord<K, V>> {
+ @Nullable Tracer trace = null;
+
+ @Setup
+ public void setup(PipelineOptions options) {
+ // inject tracer via options
+ io.opentelemetry.api.OpenTelemetry openTelemetry =
+ options.as(SdkHarnessOptions.class).getOpenTelemetry();
+ if (openTelemetry != null) {
+ trace = openTelemetry.getTracer("KafkaIO");
+ }
+ }
+
+ Context extractSpanContext(KafkaRecord<K, V> message) {
+ TextMapGetter<KafkaRecord<K, V>> extractMessageAttributes =
+ new TextMapGetter<KafkaRecord<K, V>>() {
+
+ @Override
+ public @Nullable String get(@Nullable KafkaRecord<K, V> carrier,
String key) {
+
+ Headers headers =
Preconditions.checkArgumentNotNull(carrier).getHeaders();
+ if (headers == null) return null;
+ Header header = headers.lastHeader(key);
+ if (header == null) return null;
+ return new String(header.value(), UTF_8);
+ }
+
+ @Override
+ public Iterable<String> keys(KafkaRecord<K, V> carrier) {
+ if (carrier.getHeaders() == null) return ImmutableList.of();
+ return StreamSupport.stream(carrier.getHeaders().spliterator(),
false)
+ .map(Header::key)
+ .collect(Collectors.toList());
+ }
+ };
+ return W3CTraceContextPropagator.getInstance()
+ .extract(Context.current(), message, extractMessageAttributes);
+ }
Review Comment:

The `TextMapGetter` implementation should be fully null-safe. The `carrier`
parameter in `get` and `keys` is annotated as `@Nullable` (or can be null in
some contexts), so dereferencing it directly without a null check can lead to
`NullPointerException`s. Additionally, using
`Preconditions.checkArgumentNotNull` will throw an exception on null, whereas
the getter should gracefully return `null` or an empty list.
```java
Context extractSpanContext(KafkaRecord<K, V> message) {
TextMapGetter<KafkaRecord<K, V>> extractMessageAttributes =
new TextMapGetter<KafkaRecord<K, V>>() {
@Override
public @Nullable String get(@Nullable KafkaRecord<K, V> carrier,
String key) {
if (carrier == null) {
return null;
}
Headers headers = carrier.getHeaders();
if (headers == null) {
return null;
}
Header header = headers.lastHeader(key);
if (header == null) {
return null;
}
return new String(header.value(), UTF_8);
}
@Override
public Iterable<String> keys(@Nullable KafkaRecord<K, V>
carrier) {
if (carrier == null || carrier.getHeaders() == null) {
return ImmutableList.of();
}
return
StreamSupport.stream(carrier.getHeaders().spliterator(), false)
.map(Header::key)
.collect(Collectors.toList());
}
};
return W3CTraceContextPropagator.getInstance()
.extract(Context.current(), message, extractMessageAttributes);
}
```
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -2218,6 +2254,76 @@ public void populateDisplayData(DisplayData.Builder
builder) {
}
}
+ static class OpenTelemetryHeaderConsumer<K, V>
+ extends DoFn<KafkaRecord<K, V>, KafkaRecord<K, V>> {
+ @Nullable Tracer trace = null;
+
+ @Setup
+ public void setup(PipelineOptions options) {
+ // inject tracer via options
+ io.opentelemetry.api.OpenTelemetry openTelemetry =
+ options.as(SdkHarnessOptions.class).getOpenTelemetry();
+ if (openTelemetry != null) {
+ trace = openTelemetry.getTracer("KafkaIO");
+ }
+ }
+
+ Context extractSpanContext(KafkaRecord<K, V> message) {
+ TextMapGetter<KafkaRecord<K, V>> extractMessageAttributes =
+ new TextMapGetter<KafkaRecord<K, V>>() {
+
+ @Override
+ public @Nullable String get(@Nullable KafkaRecord<K, V> carrier,
String key) {
+
+ Headers headers =
Preconditions.checkArgumentNotNull(carrier).getHeaders();
+ if (headers == null) return null;
+ Header header = headers.lastHeader(key);
+ if (header == null) return null;
+ return new String(header.value(), UTF_8);
+ }
+
+ @Override
+ public Iterable<String> keys(KafkaRecord<K, V> carrier) {
+ if (carrier.getHeaders() == null) return ImmutableList.of();
+ return StreamSupport.stream(carrier.getHeaders().spliterator(),
false)
+ .map(Header::key)
+ .collect(Collectors.toList());
+ }
+ };
+ return W3CTraceContextPropagator.getInstance()
+ .extract(Context.current(), message, extractMessageAttributes);
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element KafkaRecord<K, V> element, OutputReceiver<KafkaRecord<K, V>>
receiver) {
+ Context context = extractSpanContext(element);
+ try (Scope ignored = context.makeCurrent()) {
+ receiver.output(element);
+ }
+ }
+ }
+
+ static class OpenTelemetryHeaderPropagator<K, V>
+ extends DoFn<ProducerRecord<K, V>, ProducerRecord<K, V>> {
+ void injectTraceContext(ProducerRecord<K, V> message) {
+ TextMapSetter<ProducerRecord<K, V>> injectMessageAttributes =
+ (carrier, key, value) -> {
+ Preconditions.checkArgumentNotNull(carrier);
+ Objects.requireNonNull(carrier.headers()).add(key,
value.getBytes(UTF_8));
+ };
+ W3CTraceContextPropagator.getInstance()
+ .inject(Context.current(), message, injectMessageAttributes);
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element ProducerRecord<K, V> element,
OutputReceiver<ProducerRecord<K, V>> receiver) {
+ injectTraceContext(element);
+ receiver.output(element);
+ }
Review Comment:

In Apache Beam, mutating input elements inside a `DoFn` is a violation of
the programming model and can lead to severe correctness issues, especially if
elements are retried or processed by multiple downstream steps.
`ProducerRecord`'s headers should not be mutated in-place. Instead, copy the
headers to a new `RecordHeaders` instance, inject the trace context, and return
a new `ProducerRecord`.
```java
ProducerRecord<K, V> injectTraceContext(ProducerRecord<K, V> message) {
org.apache.kafka.common.header.internals.RecordHeaders headers =
new
org.apache.kafka.common.header.internals.RecordHeaders(message.headers());
TextMapSetter<org.apache.kafka.common.header.internals.RecordHeaders>
injectMessageAttributes =
(carrier, key, value) -> {
if (carrier != null) {
carrier.add(key, value.getBytes(UTF_8));
}
};
W3CTraceContextPropagator.getInstance()
.inject(Context.current(), headers, injectMessageAttributes);
return new ProducerRecord<>(
message.topic(),
message.partition(),
message.timestamp(),
message.key(),
message.value(),
headers);
}
@ProcessElement
public void processElement(
@Element ProducerRecord<K, V> element,
OutputReceiver<ProducerRecord<K, V>> receiver) {
receiver.output(injectTraceContext(element));
}
```
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -2218,6 +2254,76 @@ public void populateDisplayData(DisplayData.Builder
builder) {
}
}
+ static class OpenTelemetryHeaderConsumer<K, V>
+ extends DoFn<KafkaRecord<K, V>, KafkaRecord<K, V>> {
+ @Nullable Tracer trace = null;
+
+ @Setup
+ public void setup(PipelineOptions options) {
+ // inject tracer via options
+ io.opentelemetry.api.OpenTelemetry openTelemetry =
+ options.as(SdkHarnessOptions.class).getOpenTelemetry();
+ if (openTelemetry != null) {
+ trace = openTelemetry.getTracer("KafkaIO");
+ }
+ }
Review Comment:

The `trace` field and the `@Setup` method are unused in
`OpenTelemetryHeaderConsumer`. Since context extraction relies on the global
`W3CTraceContextPropagator` and does not use the `Tracer` instance, these can
be safely removed to keep the code clean.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]