gemini-code-assist[bot] commented on code in PR #39150:
URL: https://github.com/apache/beam/pull/39150#discussion_r3491901073
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1129,11 +1188,13 @@ public PCollection<T> expand(PBegin input) {
}
if (getDeadLetterTopicProvider() != null
- && !(getBadRecordRouter() instanceof ThrowingBadRecordRouter)) {
+ && (getBadRecordRouter() instanceof ThrowingBadRecordRouter)) {
throw new IllegalArgumentException(
"PubSubIO cannot be configured with both a dead letter topic and a
bad record router");
}
Review Comment:

This change introduces a critical logic inversion bug. The original code
threw an exception if a dead letter topic was configured AND a custom bad
record router was used (i.e., not the default `ThrowingBadRecordRouter`). By
changing `!(...)` to `(...)`, the code now throws an exception when a dead
letter topic is configured with the DEFAULT `ThrowingBadRecordRouter`, which
completely breaks the standard dead letter topic configuration for all users.
```suggestion
if (getDeadLetterTopicProvider() != null
&& !(getBadRecordRouter() instanceof ThrowingBadRecordRouter)) {
throw new IllegalArgumentException(
"PubSubIO cannot be configured with both a dead letter topic and
a bad record router");
}
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1079,6 +1094,50 @@ public Read<T> withIdAttribute(String idAttribute) {
return toBuilder().setIdAttribute(idAttribute).build();
}
+ public Read<T> withEnableOpenTelemetryTracing() {
+ return
toBuilder().setEnableOpenTelemetryTracing(true).setNeedsAttributes(true).build();
+ }
+
+ static class OpenTelemetryHeaderConsumer extends DoFn<PubsubMessage,
PubsubMessage> {
+ Context extractSpanContext(PubsubMessage message) {
+ TextMapGetter<PubsubMessage> extractMessageAttributes =
+ new TextMapGetter<PubsubMessage>() {
+ @Override
+ public String get(@Nullable PubsubMessage carrier, String key) {
+ String attribute =
+
Preconditions.checkArgumentNotNull(carrier).getAttribute("googclient_" + key);
+ return attribute == null ? "" : attribute;
+ }
+
+ @Override
+ public Iterable<String> keys(PubsubMessage carrier) {
+ Map<String, String> attributeMap = carrier.getAttributeMap();
+ return attributeMap == null ? ImmutableList.of() :
attributeMap.keySet();
+ }
+ };
+ return W3CTraceContextPropagator.getInstance()
+ .extract(Context.current(), message, extractMessageAttributes);
+ }
Review Comment:

There are three issues with the `TextMapGetter` implementation:
1. If `carrier` is null, `get` will throw a `NullPointerException` due to
`Preconditions.checkArgumentNotNull(carrier)`. According to the OpenTelemetry
specification, `get` should return `null` if the carrier is null or the key is
not found.
2. Returning `""` (empty string) instead of `null` when the attribute is
missing can cause the propagator to fail or log warnings because it tries to
parse an empty string as a traceparent.
3. `keys` returns the raw keys from the attribute map (e.g.,
`"googclient_traceparent"`). However, `get` prepends `"googclient_"` to the
key. If a propagator iterates over the keys returned by `keys` and calls `get`,
it will look up `"googclient_googclient_traceparent"`. `keys` should filter and
strip the `"googclient_"` prefix to remain consistent with `get`.
```java
Context extractSpanContext(PubsubMessage message) {
TextMapGetter<PubsubMessage> extractMessageAttributes =
new TextMapGetter<PubsubMessage>() {
@Override
public String get(@Nullable PubsubMessage carrier, String key)
{
if (carrier == null) {
return null;
}
return carrier.getAttribute("googclient_" + key);
}
@Override
public Iterable<String> keys(PubsubMessage carrier) {
if (carrier == null) {
return ImmutableList.of();
}
Map<String, String> attributeMap = carrier.getAttributeMap();
if (attributeMap == null) {
return ImmutableList.of();
}
java.util.List<String> keys = new java.util.ArrayList<>();
for (String key : attributeMap.keySet()) {
if (key.startsWith("googclient_")) {
keys.add(key.substring("googclient_".length()));
}
}
return keys;
}
};
return W3CTraceContextPropagator.getInstance()
.extract(Context.current(), message, extractMessageAttributes);
}
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1439,6 +1513,39 @@ abstract Builder<T> setBadRecordErrorHandler(
abstract Write<T> build();
}
+ static class OpenTelemetryHeaderPropagator extends DoFn<PubsubMessage,
PubsubMessage> {
+ void injectSpanContext(Map<String, String> attr) {
+ TextMapSetter<Map<String, String>> inject =
+ new TextMapSetter<Map<String, String>>() {
+ @Override
+ public void set(@Nullable Map<String, String> attr, String key,
String value) {
+ Preconditions.checkArgumentNotNull(attr).put("googclient_" +
key, value);
+ }
+ };
Review Comment:

Using `Preconditions.checkArgumentNotNull(attr)` inside the `TextMapSetter`
is risky because the setter's contract allows `attr` to be `@Nullable`. It is
safer to perform a null check and skip injection if `attr` is null.
```java
void injectSpanContext(Map<String, String> attr) {
TextMapSetter<Map<String, String>> inject =
new TextMapSetter<Map<String, String>>() {
@Override
public void set(@Nullable Map<String, String> attr, String
key, String value) {
if (attr != null) {
attr.put("googclient_" + key, value);
}
}
};
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1439,6 +1513,39 @@ abstract Builder<T> setBadRecordErrorHandler(
abstract Write<T> build();
}
+ static class OpenTelemetryHeaderPropagator extends DoFn<PubsubMessage,
PubsubMessage> {
+ void injectSpanContext(Map<String, String> attr) {
+ TextMapSetter<Map<String, String>> inject =
+ new TextMapSetter<Map<String, String>>() {
+ @Override
+ public void set(@Nullable Map<String, String> attr, String key,
String value) {
+ Preconditions.checkArgumentNotNull(attr).put("googclient_" +
key, value);
+ }
+ };
+ W3CTraceContextPropagator.getInstance().inject(Context.current(),
attr, inject);
+ }
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c,
+ @Element PubsubMessage message,
+ OutputReceiver<PubsubMessage> output,
+ PipelineOptions po) {
+ Map<String, String> attributeMap = message.getAttributeMap();
+ Map<String, String> attr = new
HashMap<>(Objects.requireNonNull(attributeMap));
+ injectSpanContext(attr);
Review Comment:

`message.getAttributeMap()` can return `null` if the PubsubMessage was
constructed without attributes. Calling `Objects.requireNonNull(attributeMap)`
will throw a `NullPointerException` at runtime. We should handle the null case
gracefully by initializing an empty `HashMap`.
```suggestion
Map<String, String> attributeMap = message.getAttributeMap();
Map<String, String> attr =
attributeMap == null ? new HashMap<>() : new
HashMap<>(attributeMap);
injectSpanContext(attr);
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1079,6 +1094,50 @@ public Read<T> withIdAttribute(String idAttribute) {
return toBuilder().setIdAttribute(idAttribute).build();
}
+ public Read<T> withEnableOpenTelemetryTracing() {
+ return
toBuilder().setEnableOpenTelemetryTracing(true).setNeedsAttributes(true).build();
+ }
+
+ static class OpenTelemetryHeaderConsumer extends DoFn<PubsubMessage,
PubsubMessage> {
+ Context extractSpanContext(PubsubMessage message) {
+ TextMapGetter<PubsubMessage> extractMessageAttributes =
+ new TextMapGetter<PubsubMessage>() {
+ @Override
+ public String get(@Nullable PubsubMessage carrier, String key) {
+ String attribute =
+
Preconditions.checkArgumentNotNull(carrier).getAttribute("googclient_" + key);
+ return attribute == null ? "" : attribute;
+ }
+
+ @Override
+ public Iterable<String> keys(PubsubMessage carrier) {
+ Map<String, String> attributeMap = carrier.getAttributeMap();
+ return attributeMap == null ? ImmutableList.of() :
attributeMap.keySet();
+ }
+ };
+ return W3CTraceContextPropagator.getInstance()
+ .extract(Context.current(), message, extractMessageAttributes);
+ }
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c,
+ @Element PubsubMessage message,
+ OutputReceiver<PubsubMessage> output,
+ PipelineOptions po) {
+ Context context = extractSpanContext(message);
+ TracerProvider tracerProvider =
+ po.as(OpenTelemetryTracingOptions.class).getTracerProvider();
+ Tracer tracer =
Preconditions.checkArgumentNotNull(tracerProvider).get("PubSubIO");
+ Span psSub = tracer.spanBuilder("Process
Element").setParent(context).startSpan();
+ try (Scope s = context.makeCurrent()) {
+ output.output(message);
+ } finally {
Review Comment:

The scope is being created using `context.makeCurrent()` instead of
`psSub.makeCurrent()`. This means the newly created span `psSub` is never made
current, and any downstream operations or nested spans created during
`output.output(message)` will not be children of `psSub` ("Process Element").
Instead, they will be direct children of the extracted parent `context`.
```suggestion
try (Scope s = psSub.makeCurrent()) {
output.output(message);
} finally {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]