sjvanrossum commented on code in PR #36962:
URL: https://github.com/apache/beam/pull/36962#discussion_r2601942650
##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -857,8 +857,13 @@ class BeamModulePlugin implements Plugin<Project> {
netty_tcnative_boringssl_static :
"io.netty:netty-tcnative-boringssl-static:2.0.52.Final",
netty_transport :
"io.netty:netty-transport:$netty_version",
netty_transport_native_epoll :
"io.netty:netty-transport-native-epoll:$netty_version",
- opentelemetry_api :
"io.opentelemetry:opentelemetry-api", // google_cloud_platform_libraries_bom
sets version
+ opentelemetry_api :
"io.opentelemetry:opentelemetry-api:$opentelemetry_version",
opentelemetry_bom :
"io.opentelemetry:opentelemetry-bom-alpha:$opentelemetry_version-alpha", //
alpha required by extensions
+ opentelemetry_context :
"io.opentelemetry:opentelemetry-context:$opentelemetry_version",
+ opentelemetry_gcp_auth :
"io.opentelemetry.contrib:opentelemetry-gcp-auth-extension:$opentelemetry_version-alpha",
Review Comment:
The version numbers of sdk, instrumentation and contrib are distinct.
contrib 1.52.0 depends on instrumentation 2.22.0 which depends on sdk 1.56.0.
The GCP BOM does not refer to contrib and instrumentation iirc, so those
versions need to be managed manually.
Feel free to pluck those changes from my PR for the portability runner.
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -904,11 +987,14 @@ public <NewT> WindowedValueCoder<NewT>
withValueCoder(Coder<NewT> valueCoder) {
@Override
public void encode(WindowedValue<T> windowedElem, OutputStream outStream)
throws CoderException, IOException {
- encode(windowedElem, outStream, Context.NESTED);
+ encode(windowedElem, outStream,
org.apache.beam.sdk.coders.Coder.Context.NESTED);
Review Comment:
`Coder.Context` would suffice here I think?
```suggestion
encode(windowedElem, outStream, Coder.Context.NESTED);
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -904,11 +987,14 @@ public <NewT> WindowedValueCoder<NewT>
withValueCoder(Coder<NewT> valueCoder) {
@Override
public void encode(WindowedValue<T> windowedElem, OutputStream outStream)
throws CoderException, IOException {
- encode(windowedElem, outStream, Context.NESTED);
+ encode(windowedElem, outStream,
org.apache.beam.sdk.coders.Coder.Context.NESTED);
}
@Override
- public void encode(WindowedValue<T> windowedElem, OutputStream outStream,
Context context)
+ public void encode(
+ WindowedValue<T> windowedElem,
+ OutputStream outStream,
+ org.apache.beam.sdk.coders.Coder.Context context)
Review Comment:
Same here.
```suggestion
Coder.Context context)
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -918,6 +1004,10 @@ public void encode(WindowedValue<T> windowedElem,
OutputStream outStream, Contex
if (metadataSupported) {
BeamFnApi.Elements.ElementMetadata.Builder builder =
BeamFnApi.Elements.ElementMetadata.newBuilder();
+ io.opentelemetry.context.Context context1 = windowedElem.getContext();
Review Comment:
```suggestion
@Nullable Context context1 = windowedElem.getContext();
```
Maybe rename `context1` to `openTelemetryContext`?
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -933,16 +1023,18 @@ public void encode(WindowedValue<T> windowedElem,
OutputStream outStream, Contex
@Override
public WindowedValue<T> decode(InputStream inStream) throws
CoderException, IOException {
- return decode(inStream, Context.NESTED);
+ return decode(inStream, org.apache.beam.sdk.coders.Coder.Context.NESTED);
}
@Override
- public WindowedValue<T> decode(InputStream inStream, Context context)
+ public WindowedValue<T> decode(
+ InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
Review Comment:
```suggestion
InputStream inStream, Coder.Context context)
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -1023,22 +1161,26 @@ public <NewT> WindowedValueCoder<NewT>
withValueCoder(Coder<NewT> valueCoder) {
@Override
public void encode(WindowedValue<T> windowedElem, OutputStream outStream)
throws CoderException, IOException {
- encode(windowedElem, outStream, Context.NESTED);
+ encode(windowedElem, outStream,
org.apache.beam.sdk.coders.Coder.Context.NESTED);
}
@Override
- public void encode(WindowedValue<T> windowedElem, OutputStream outStream,
Context context)
+ public void encode(
+ WindowedValue<T> windowedElem,
+ OutputStream outStream,
+ org.apache.beam.sdk.coders.Coder.Context context)
Review Comment:
```suggestion
Coder.Context context)
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -1023,22 +1161,26 @@ public <NewT> WindowedValueCoder<NewT>
withValueCoder(Coder<NewT> valueCoder) {
@Override
public void encode(WindowedValue<T> windowedElem, OutputStream outStream)
throws CoderException, IOException {
- encode(windowedElem, outStream, Context.NESTED);
+ encode(windowedElem, outStream,
org.apache.beam.sdk.coders.Coder.Context.NESTED);
}
@Override
- public void encode(WindowedValue<T> windowedElem, OutputStream outStream,
Context context)
+ public void encode(
+ WindowedValue<T> windowedElem,
+ OutputStream outStream,
+ org.apache.beam.sdk.coders.Coder.Context context)
throws CoderException, IOException {
valueCoder.encode(windowedElem.getValue(), outStream, context);
}
@Override
public WindowedValue<T> decode(InputStream inStream) throws
CoderException, IOException {
- return decode(inStream, Context.NESTED);
+ return decode(inStream, org.apache.beam.sdk.coders.Coder.Context.NESTED);
Review Comment:
```suggestion
return decode(inStream, Coder.Context.NESTED);
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -1134,22 +1276,26 @@ public <NewT> WindowedValueCoder<NewT>
withValueCoder(Coder<NewT> valueCoder) {
@Override
public void encode(WindowedValue<T> windowedElem, OutputStream outStream)
throws CoderException, IOException {
- encode(windowedElem, outStream, Context.NESTED);
+ encode(windowedElem, outStream,
org.apache.beam.sdk.coders.Coder.Context.NESTED);
}
@Override
- public void encode(WindowedValue<T> windowedElem, OutputStream outStream,
Context context)
+ public void encode(
+ WindowedValue<T> windowedElem,
+ OutputStream outStream,
+ org.apache.beam.sdk.coders.Coder.Context context)
throws CoderException, IOException {
valueCoder.encode(windowedElem.getValue(), outStream, context);
}
@Override
public WindowedValue<T> decode(InputStream inStream) throws
CoderException, IOException {
- return decode(inStream, Context.NESTED);
+ return decode(inStream, org.apache.beam.sdk.coders.Coder.Context.NESTED);
Review Comment:
```suggestion
return decode(inStream, Coder.Context.NESTED);
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java:
##########
@@ -105,11 +109,14 @@ public static <T> Coder<T> of(
@Override
public void encode(ValueInSingleWindow<T> windowedElem, OutputStream
outStream)
throws IOException {
- encode(windowedElem, outStream, Context.NESTED);
+ encode(windowedElem, outStream,
org.apache.beam.sdk.coders.Coder.Context.NESTED);
}
@Override
- public void encode(ValueInSingleWindow<T> windowedElem, OutputStream
outStream, Context context)
+ public void encode(
+ ValueInSingleWindow<T> windowedElem,
+ OutputStream outStream,
+ org.apache.beam.sdk.coders.Coder.Context context)
Review Comment:
```suggestion
Coder.Context context)
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java:
##########
@@ -129,22 +140,27 @@ public void encode(ValueInSingleWindow<T> windowedElem,
OutputStream outStream,
@Override
public ValueInSingleWindow<T> decode(InputStream inStream) throws
IOException {
- return decode(inStream, Context.NESTED);
+ return decode(inStream, org.apache.beam.sdk.coders.Coder.Context.NESTED);
Review Comment:
```suggestion
return decode(inStream, Coder.Context.NESTED);
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java:
##########
@@ -129,22 +140,27 @@ public void encode(ValueInSingleWindow<T> windowedElem,
OutputStream outStream,
@Override
public ValueInSingleWindow<T> decode(InputStream inStream) throws
IOException {
- return decode(inStream, Context.NESTED);
+ return decode(inStream, org.apache.beam.sdk.coders.Coder.Context.NESTED);
}
@Override
@SuppressWarnings("IgnoredPureGetter")
- public ValueInSingleWindow<T> decode(InputStream inStream, Context
context) throws IOException {
+ public ValueInSingleWindow<T> decode(
+ InputStream inStream, org.apache.beam.sdk.coders.Coder.Context
context) throws IOException {
Instant timestamp = InstantCoder.of().decode(inStream);
BoundedWindow window = windowCoder.decode(inStream);
PaneInfo paneInfo = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream);
+ io.opentelemetry.context.Context openTelemetryContext = null;
Review Comment:
```suggestion
@Nullable Context openTelemetryContext = null;
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValue.java:
##########
@@ -49,6 +50,9 @@ public interface WindowedValue<T> {
@Nullable
String getRecordId();
+ @Nullable
+ Context getContext();
Review Comment:
```suggestion
Context getOpenTelemetryContext();
```
@kennknowles same here, right?
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -99,6 +108,7 @@ public static class Builder<T> implements OutputBuilder<T> {
private @MonotonicNonNull Collection<? extends BoundedWindow> windows;
private @Nullable String recordId;
private @Nullable Long recordOffset;
+ private @Nullable Context context;
Review Comment:
```suggestion
private @Nullable Context openTelemetryContext;
```
And these as well for clarity.
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -933,16 +1023,18 @@ public void encode(WindowedValue<T> windowedElem,
OutputStream outStream, Contex
@Override
public WindowedValue<T> decode(InputStream inStream) throws
CoderException, IOException {
- return decode(inStream, Context.NESTED);
+ return decode(inStream, org.apache.beam.sdk.coders.Coder.Context.NESTED);
Review Comment:
```suggestion
return decode(inStream, Coder.Context.NESTED);
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -1023,22 +1161,26 @@ public <NewT> WindowedValueCoder<NewT>
withValueCoder(Coder<NewT> valueCoder) {
@Override
public void encode(WindowedValue<T> windowedElem, OutputStream outStream)
throws CoderException, IOException {
- encode(windowedElem, outStream, Context.NESTED);
+ encode(windowedElem, outStream,
org.apache.beam.sdk.coders.Coder.Context.NESTED);
}
@Override
- public void encode(WindowedValue<T> windowedElem, OutputStream outStream,
Context context)
+ public void encode(
+ WindowedValue<T> windowedElem,
+ OutputStream outStream,
+ org.apache.beam.sdk.coders.Coder.Context context)
throws CoderException, IOException {
valueCoder.encode(windowedElem.getValue(), outStream, context);
}
@Override
public WindowedValue<T> decode(InputStream inStream) throws
CoderException, IOException {
- return decode(inStream, Context.NESTED);
+ return decode(inStream, org.apache.beam.sdk.coders.Coder.Context.NESTED);
}
@Override
- public WindowedValue<T> decode(InputStream inStream, Context context)
+ public WindowedValue<T> decode(
+ InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
Review Comment:
```suggestion
InputStream inStream, Coder.Context context)
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -1134,22 +1276,26 @@ public <NewT> WindowedValueCoder<NewT>
withValueCoder(Coder<NewT> valueCoder) {
@Override
public void encode(WindowedValue<T> windowedElem, OutputStream outStream)
throws CoderException, IOException {
- encode(windowedElem, outStream, Context.NESTED);
+ encode(windowedElem, outStream,
org.apache.beam.sdk.coders.Coder.Context.NESTED);
Review Comment:
```suggestion
encode(windowedElem, outStream, Coder.Context.NESTED);
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -1023,22 +1161,26 @@ public <NewT> WindowedValueCoder<NewT>
withValueCoder(Coder<NewT> valueCoder) {
@Override
public void encode(WindowedValue<T> windowedElem, OutputStream outStream)
throws CoderException, IOException {
- encode(windowedElem, outStream, Context.NESTED);
+ encode(windowedElem, outStream,
org.apache.beam.sdk.coders.Coder.Context.NESTED);
Review Comment:
```suggestion
encode(windowedElem, outStream, Coder.Context.NESTED);
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -1134,22 +1276,26 @@ public <NewT> WindowedValueCoder<NewT>
withValueCoder(Coder<NewT> valueCoder) {
@Override
public void encode(WindowedValue<T> windowedElem, OutputStream outStream)
throws CoderException, IOException {
- encode(windowedElem, outStream, Context.NESTED);
+ encode(windowedElem, outStream,
org.apache.beam.sdk.coders.Coder.Context.NESTED);
}
@Override
- public void encode(WindowedValue<T> windowedElem, OutputStream outStream,
Context context)
+ public void encode(
+ WindowedValue<T> windowedElem,
+ OutputStream outStream,
+ org.apache.beam.sdk.coders.Coder.Context context)
Review Comment:
```suggestion
Coder.Context context)
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java:
##########
@@ -105,11 +109,14 @@ public static <T> Coder<T> of(
@Override
public void encode(ValueInSingleWindow<T> windowedElem, OutputStream
outStream)
throws IOException {
- encode(windowedElem, outStream, Context.NESTED);
+ encode(windowedElem, outStream,
org.apache.beam.sdk.coders.Coder.Context.NESTED);
Review Comment:
```suggestion
encode(windowedElem, outStream, Coder.Context.NESTED);
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -1134,22 +1276,26 @@ public <NewT> WindowedValueCoder<NewT>
withValueCoder(Coder<NewT> valueCoder) {
@Override
public void encode(WindowedValue<T> windowedElem, OutputStream outStream)
throws CoderException, IOException {
- encode(windowedElem, outStream, Context.NESTED);
+ encode(windowedElem, outStream,
org.apache.beam.sdk.coders.Coder.Context.NESTED);
}
@Override
- public void encode(WindowedValue<T> windowedElem, OutputStream outStream,
Context context)
+ public void encode(
+ WindowedValue<T> windowedElem,
+ OutputStream outStream,
+ org.apache.beam.sdk.coders.Coder.Context context)
throws CoderException, IOException {
valueCoder.encode(windowedElem.getValue(), outStream, context);
}
@Override
public WindowedValue<T> decode(InputStream inStream) throws
CoderException, IOException {
- return decode(inStream, Context.NESTED);
+ return decode(inStream, org.apache.beam.sdk.coders.Coder.Context.NESTED);
}
@Override
- public WindowedValue<T> decode(InputStream inStream, Context context)
+ public WindowedValue<T> decode(
+ InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
Review Comment:
```suggestion
InputStream inStream, Coder.Context context)
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java:
##########
@@ -129,22 +140,27 @@ public void encode(ValueInSingleWindow<T> windowedElem,
OutputStream outStream,
@Override
public ValueInSingleWindow<T> decode(InputStream inStream) throws
IOException {
- return decode(inStream, Context.NESTED);
+ return decode(inStream, org.apache.beam.sdk.coders.Coder.Context.NESTED);
}
@Override
@SuppressWarnings("IgnoredPureGetter")
- public ValueInSingleWindow<T> decode(InputStream inStream, Context
context) throws IOException {
+ public ValueInSingleWindow<T> decode(
+ InputStream inStream, org.apache.beam.sdk.coders.Coder.Context
context) throws IOException {
Review Comment:
```suggestion
InputStream inStream, Coder.Context context) throws IOException {
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java:
##########
@@ -120,6 +127,10 @@ public void encode(ValueInSingleWindow<T> windowedElem,
OutputStream outStream,
BeamFnApi.Elements.ElementMetadata.Builder builder =
BeamFnApi.Elements.ElementMetadata.newBuilder();
// todo #33176 specify additional metadata in the future
+ io.opentelemetry.context.Context context1 = windowedElem.getContext();
Review Comment:
```suggestion
@Nullable Context context1 = windowedElem.getContext();
```
Maybe rename `context1` to `openTelemetryContext`?
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java:
##########
@@ -933,16 +1023,18 @@ public void encode(WindowedValue<T> windowedElem,
OutputStream outStream, Contex
@Override
public WindowedValue<T> decode(InputStream inStream) throws
CoderException, IOException {
- return decode(inStream, Context.NESTED);
+ return decode(inStream, org.apache.beam.sdk.coders.Coder.Context.NESTED);
}
@Override
- public WindowedValue<T> decode(InputStream inStream, Context context)
+ public WindowedValue<T> decode(
+ InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
throws CoderException, IOException {
Instant timestamp = InstantCoder.of().decode(inStream);
Collection<? extends BoundedWindow> windows =
windowsCoder.decode(inStream);
PaneInfo paneInfo = PaneInfoCoder.INSTANCE.decode(inStream);
boolean causedByDrain = false;
+ io.opentelemetry.context.Context otelContext = null;
Review Comment:
```suggestion
@Nullable Context otelContext = null;
```
Make sure the prefix is consistent across files, either `otelContext` or
`openTelemetryContext`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]