This is an automated email from the ASF dual-hosted git repository.

abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new adef734100 Add metrics support for Temporal Workflow (#4095)
adef734100 is described below

commit adef734100aa3eb076a0aeea8d202c4131543d02
Author: abhishekmjain <[email protected]>
AuthorDate: Wed Feb 5 20:27:16 2025 +0530

    Add metrics support for Temporal Workflow (#4095)
---
 gobblin-temporal/build.gradle                      |   2 +
 .../temporal/GobblinTemporalConfigurationKeys.java |  10 ++
 .../client/TemporalWorkflowClientFactory.java      |  14 +++
 .../workflows/metrics/TemporalMetricsHelper.java   | 121 +++++++++++++++++++++
 .../metrics/TemporalMetricsHelperTest.java         |  85 +++++++++++++++
 gradle/scripts/defaultBuildProperties.gradle       |   4 +-
 gradle/scripts/dependencyDefinitions.gradle        |   2 +
 7 files changed, 237 insertions(+), 1 deletion(-)

diff --git a/gobblin-temporal/build.gradle b/gobblin-temporal/build.gradle
index fa34245e91..19687ad3af 100644
--- a/gobblin-temporal/build.gradle
+++ b/gobblin-temporal/build.gradle
@@ -64,6 +64,8 @@ dependencies {
     }
     compile externalDependency.tdigest
     compile externalDependency."temporal-sdk"
+    compile externalDependency.micrometerCore
+    compile externalDependency.micrometerRegistry
     testCompile project(path: ':gobblin-cluster', configuration: 'tests')
     testCompile project(":gobblin-example")
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 0201cda0fb..82ea646090 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -74,4 +74,14 @@ public interface GobblinTemporalConfigurationKeys {
 
   String DYNAMIC_SCALING_POLLING_INTERVAL_SECS = DYNAMIC_SCALING_PREFIX + 
"polling.interval.seconds";
   int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+
+  /**
+   * Temporal metrics config properties
+   */
+  String TEMPORAL_METRICS_PREFIX = PREFIX + "metrics.";
+  String TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT = TEMPORAL_METRICS_PREFIX + 
"otlp";
+  String TEMPORAL_METRICS_OTLP_HEADERS_KEY = 
TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".headers";
+  String TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 
TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".report.interval.seconds";
+  int DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 10;
+  String TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY = 
TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".dimensions";
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
index 030d4aae18..c8a1052ae9 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
@@ -30,6 +30,8 @@ import org.apache.commons.io.FileUtils;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.uber.m3.tally.RootScopeBuilder;
+import com.uber.m3.tally.Scope;
 
 import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
@@ -41,7 +43,10 @@ import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.TrustManagerFactory;
 
 import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.ddm.work.assistance.MDCContextPropagator;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalMetricsHelper;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 public class TemporalWorkflowClientFactory {
@@ -100,10 +105,19 @@ public class TemporalWorkflowClientFactory {
                 .ciphers(SSL_CONFIG_DEFAULT_CIPHER_SUITES)
                 .build();
 
+        // Initialize metrics
+        int reportInterval = ConfigUtils.getInt(config, 
GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_REPORT_INTERVAL_SECS,
+            
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS);
+        Scope metricsScope = new RootScopeBuilder()
+            .reporter(TemporalMetricsHelper.getStatsReporter(config))
+            .tags(TemporalMetricsHelper.getDimensions(config))
+            .reportEvery(com.uber.m3.util.Duration.ofSeconds(reportInterval));
+
         WorkflowServiceStubsOptions options = 
WorkflowServiceStubsOptions.newBuilder()
                 .setTarget(connectionUri)
                 .setEnableHttps(true)
                 .setSslContext(sslContext)
+                .setMetricsScope(metricsScope)
                 .build();
         return WorkflowServiceStubs.newServiceStubs(options);
     }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelper.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelper.java
new file mode 100644
index 0000000000..443c5613da
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelper.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+import org.jetbrains.annotations.NotNull;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.typesafe.config.Config;
+import com.uber.m3.tally.StatsReporter;
+
+import io.micrometer.core.instrument.Clock;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.registry.otlp.OtlpConfig;
+import io.micrometer.registry.otlp.OtlpMeterRegistry;
+import io.temporal.common.reporter.MicrometerClientStatsReporter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class TemporalMetricsHelper {
+
+  /**
+   * Retrieves a map of dimension names and their corresponding values from 
the provided config.
+   * The dimensions are defined as a comma-separated string in the config, and 
the method
+   * fetches the corresponding values for each dimension.
+   * A missing dimension in config will have empty string as value.
+   *
+   * @param config Config object
+   * @return a map where the key is the dimension name and the value is the 
corresponding value from the config
+   */
+  public static Map<String, String> getDimensions(Config config) {
+    String dimensionsString = ConfigUtils.getString(config, 
GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY, "");
+
+    // Split the string by "," and create a map by fetching values from config
+    return Arrays.stream(dimensionsString.split(","))
+        .map(String::trim)
+        .filter(StringUtils::isNotBlank)
+        .collect(Collectors.toMap(key -> key, key -> 
ConfigUtils.getString(config, key, ""), (l, r)-> r));
+  }
+
+  /**
+   * Creates and returns a {@link StatsReporter} instance configured with an 
{@link OtlpMeterRegistry}.
+   * This reporter can be used to report metrics via the OpenTelemetry 
Protocol (OTLP) to a metrics backend.
+   *
+   * @param config Config object
+   * @return a {@link StatsReporter} instance, configured with an OTLP meter 
registry and ready to report metrics.
+   */
+  public static StatsReporter getStatsReporter(Config config) {
+    OtlpConfig otlpConfig = getOtlpConfig(config);
+    MeterRegistry meterRegistry = new OtlpMeterRegistry(otlpConfig, 
Clock.SYSTEM);
+    return new MicrometerClientStatsReporter(meterRegistry);
+  }
+
+  @VisibleForTesting
+  static OtlpConfig getOtlpConfig(Config config) {
+    return new OtlpConfig() {
+      @Override
+      public String get(@NotNull String key) {
+        return ConfigUtils.getString(config, key, null);
+      }
+
+      @NotNull
+      @Override
+      public String prefix() {
+        return 
GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT;
+      }
+
+      @NotNull
+      @Override
+      public Map<String, String> headers() {
+        String headers = 
get(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_HEADERS_KEY);
+        return parseHeaders(headers);
+      }
+
+      @NotNull
+      @Override
+      public Duration step() {
+        int reportInterval = ConfigUtils.getInt(config, 
GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_REPORT_INTERVAL_SECS,
+            
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS);
+        return Duration.ofSeconds(reportInterval);
+      }
+    };
+  }
+
+  private static Map<String, String> parseHeaders(String headersString) {
+    try {
+      ObjectMapper mapper = new ObjectMapper();
+      return mapper.readValue(headersString, HashMap.class);
+    } catch (Exception e) {
+      String errMsg = "Failed to parse headers: " + headersString;
+      log.error(errMsg, e);
+      throw new RuntimeException(errMsg);
+    }
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelperTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelperTest.java
new file mode 100644
index 0000000000..1435d0f4df
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelperTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.workflows.metrics;
+
+
+import io.micrometer.registry.otlp.OtlpConfig;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+
+/** Test {@link TemporalMetricsHelper} */
+public class TemporalMetricsHelperTest {
+
+  private Config config;
+
+  @BeforeClass
+  public void setup() {
+    config = ConfigFactory.empty()
+        .withValue("prefix", 
ConfigValueFactory.fromAnyRef("gobblin.temporal.metrics.otlp"))
+        
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT
 + ".headers",
+            ConfigValueFactory.fromAnyRef("{\"abc\":\"123\", 
\"pqr\":\"456\"}"))
+        
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT
 + ".resourceAttributes",
+            ConfigValueFactory.fromAnyRef("service.name=gobblin-service"))
+        .withValue("dim1", ConfigValueFactory.fromAnyRef("val1"))
+        .withValue("dim2", ConfigValueFactory.fromAnyRef("val2"))
+        
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT
 + ".dimensions",
+            ConfigValueFactory.fromAnyRef("dim1,dim2,missingDimension"));
+  }
+
+  @Test
+  public void testGetDimensions() {
+    Map<String, String> dimensions = 
TemporalMetricsHelper.getDimensions(config);
+
+    Assert.assertNotNull(dimensions);
+    Assert.assertEquals(3, dimensions.size());
+    Assert.assertEquals("val1", dimensions.get("dim1"));
+    Assert.assertEquals("val2", dimensions.get("dim2"));
+    Assert.assertEquals("", dimensions.get("missingDimension"));
+  }
+
+  @Test
+  public void testGetDimensionsEmptyConfig() {
+    Map<String, String> dimensions = 
TemporalMetricsHelper.getDimensions(ConfigFactory.empty());
+
+    Assert.assertNotNull(dimensions);
+    Assert.assertEquals(0, dimensions.size());
+  }
+
+  @Test
+  public void testGetOtlpConfig() {
+    OtlpConfig otlpConfig = TemporalMetricsHelper.getOtlpConfig(config);
+
+    Map<String, String> headers = otlpConfig.headers();
+    Assert.assertNotNull(headers);
+    Assert.assertEquals(2, headers.size());
+    Assert.assertEquals("123", headers.get("abc"));
+    Assert.assertEquals("456", headers.get("pqr"));
+
+    Assert.assertEquals("gobblin-service", 
otlpConfig.resourceAttributes().get("service.name"));
+  }
+}
diff --git a/gradle/scripts/defaultBuildProperties.gradle 
b/gradle/scripts/defaultBuildProperties.gradle
index c3ff18b543..2263dcdfa0 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -41,7 +41,8 @@ def BuildProperties BUILD_PROPERTIES = new 
BuildProperties(project)
     .register(new BuildProperty("publishToMaven", false, "Enable publishing of 
artifacts to a central Maven repository"))
     .register(new BuildProperty("publishToNexus", false, "Enable publishing of 
artifacts to Nexus"))
     .register(new BuildProperty("salesforceVersion", "42.0.0", "Salesforce 
dependencies version"))
-    .register(new BuildProperty("openTelemetryVersion", "1.29.0", 
"OpenTelemetry dependencies version"))
+    .register(new BuildProperty("openTelemetryVersion", "1.30.0", 
"OpenTelemetry dependencies version"))
+    .register(new BuildProperty("micrometerVersion", "1.11.1", "Micrometer 
dependencies version"))
 task buildProperties(description: 'Lists main properties that can be used to 
customize the build') {
   doLast {
     BUILD_PROPERTIES.printHelp();
@@ -74,5 +75,6 @@ BUILD_PROPERTIES.ensureDefined('kafka1Version')
 BUILD_PROPERTIES.ensureDefined('pegasusVersion')
 BUILD_PROPERTIES.ensureDefined('salesforceVersion')
 BUILD_PROPERTIES.ensureDefined('openTelemetryVersion')
+BUILD_PROPERTIES.ensureDefined('micrometerVersion')
 
 ext.buildProperties = BUILD_PROPERTIES
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index e4b328c5e1..3d71988c14 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -120,6 +120,8 @@ ext.externalDependency = [
     "opentelemetrySdk": "io.opentelemetry:opentelemetry-sdk:" + 
openTelemetryVersion,
     "opentelemetryExporterOtlp": 
"io.opentelemetry:opentelemetry-exporter-otlp:" + openTelemetryVersion,
     "opentelemetrySdkTesting": "io.opentelemetry:opentelemetry-sdk-testing:" + 
openTelemetryVersion,
+    "micrometerCore": "io.micrometer:micrometer-core:" + micrometerVersion,
+    "micrometerRegistry": "io.micrometer:micrometer-registry-otlp:" + 
micrometerVersion,
     "jsch": "com.jcraft:jsch:0.1.54",
     "jdo2": "javax.jdo:jdo2-api:2.1",
     "azkaban": "com.linkedin.azkaban:azkaban:2.5.0",

Reply via email to