This is an automated email from the ASF dual-hosted git repository.

stankiewicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fcdd87f0aad [OpenTelemetry] FNHarness, Dataflow Runner v1 - set open 
telemetry settings. (#38785)
fcdd87f0aad is described below

commit fcdd87f0aad9fd4213004ecd35f45ec3e46e662a
Author: RadosÅ‚aw Stankiewicz <[email protected]>
AuthorDate: Thu Jun 4 09:52:27 2026 +0200

    [OpenTelemetry] FNHarness, Dataflow Runner v1 - set open telemetry 
settings. (#38785)
---
 .../runners/dataflow/worker/StreamingDataflowWorker.java    | 13 +++++++++++++
 .../src/main/java/org/apache/beam/fn/harness/FnHarness.java | 11 +++++++++++
 2 files changed, 24 insertions(+)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 4d070da995b..4c3e58978ec 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -115,6 +115,7 @@ import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.SdkHarnessOptions;
 import org.apache.beam.sdk.util.construction.CoderTranslation;
 import org.apache.beam.sdk.values.WindowedValues;
 import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.auth.MoreCallCredentials;
@@ -1042,6 +1043,18 @@ public final class StreamingDataflowWorker {
       WindowedValues.FullWindowedValueCoder.setMetadataSupported();
     }
 
+    SdkHarnessOptions sdkHarnessOptions = options.as(SdkHarnessOptions.class);
+    Map<String, String> openTelemetryProperties = 
sdkHarnessOptions.getOpenTelemetryProperties();
+    if (openTelemetryProperties != null && !openTelemetryProperties.isEmpty()) 
{
+      openTelemetryProperties.forEach(
+          (k, v) -> {
+            if (k != null && v != null) {
+              System.setProperty(k, v);
+            }
+          });
+      LOG.info("Enabled Open Telemetry with properties: {}", 
openTelemetryProperties);
+    }
+
     LOG.debug("Creating StreamingDataflowWorker from options: {}", options);
     StreamingDataflowWorker worker = 
StreamingDataflowWorker.fromOptions(options);
 
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index d0724432f3c..703e726739a 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -296,6 +296,17 @@ public class FnHarness {
       // Register standard file systems.
       FileSystems.setDefaultPipelineOptions(options);
       CoderTranslation.verifyModelCodersRegistered();
+      SdkHarnessOptions sdkHarnessOptions = 
options.as(SdkHarnessOptions.class);
+      Map<String, String> openTelemetryProperties = 
sdkHarnessOptions.getOpenTelemetryProperties();
+      if (openTelemetryProperties != null && 
!openTelemetryProperties.isEmpty()) {
+        openTelemetryProperties.forEach(
+            (k, v) -> {
+              if (k != null && v != null) {
+                System.setProperty(k, v);
+              }
+            });
+        LOG.info("Enabled Open Telemetry with properties: {}", 
openTelemetryProperties);
+      }
       EnumMap<
               BeamFnApi.InstructionRequest.RequestCase,
               ThrowingFunction<InstructionRequest, 
BeamFnApi.InstructionResponse.Builder>>

Reply via email to