This is an automated email from the ASF dual-hosted git repository.
stankiewicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fcdd87f0aad [OpenTelemetry] FNHarness, Dataflow Runner v1 - set open
telemetry settings. (#38785)
fcdd87f0aad is described below
commit fcdd87f0aad9fd4213004ecd35f45ec3e46e662a
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Thu Jun 4 09:52:27 2026 +0200
[OpenTelemetry] FNHarness, Dataflow Runner v1 - set open telemetry
settings. (#38785)
---
.../runners/dataflow/worker/StreamingDataflowWorker.java | 13 +++++++++++++
.../src/main/java/org/apache/beam/fn/harness/FnHarness.java | 11 +++++++++++
2 files changed, 24 insertions(+)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 4d070da995b..4c3e58978ec 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -115,6 +115,7 @@ import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.auth.MoreCallCredentials;
@@ -1042,6 +1043,18 @@ public final class StreamingDataflowWorker {
WindowedValues.FullWindowedValueCoder.setMetadataSupported();
}
+ SdkHarnessOptions sdkHarnessOptions = options.as(SdkHarnessOptions.class);
+ Map<String, String> openTelemetryProperties =
sdkHarnessOptions.getOpenTelemetryProperties();
+ if (openTelemetryProperties != null && !openTelemetryProperties.isEmpty())
{
+ openTelemetryProperties.forEach(
+ (k, v) -> {
+ if (k != null && v != null) {
+ System.setProperty(k, v);
+ }
+ });
+ LOG.info("Enabled Open Telemetry with properties: {}",
openTelemetryProperties);
+ }
+
LOG.debug("Creating StreamingDataflowWorker from options: {}", options);
StreamingDataflowWorker worker =
StreamingDataflowWorker.fromOptions(options);
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index d0724432f3c..703e726739a 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -296,6 +296,17 @@ public class FnHarness {
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(options);
CoderTranslation.verifyModelCodersRegistered();
+ SdkHarnessOptions sdkHarnessOptions =
options.as(SdkHarnessOptions.class);
+ Map<String, String> openTelemetryProperties =
sdkHarnessOptions.getOpenTelemetryProperties();
+ if (openTelemetryProperties != null &&
!openTelemetryProperties.isEmpty()) {
+ openTelemetryProperties.forEach(
+ (k, v) -> {
+ if (k != null && v != null) {
+ System.setProperty(k, v);
+ }
+ });
+ LOG.info("Enabled Open Telemetry with properties: {}",
openTelemetryProperties);
+ }
EnumMap<
BeamFnApi.InstructionRequest.RequestCase,
ThrowingFunction<InstructionRequest,
BeamFnApi.InstructionResponse.Builder>>