This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new adef734100 Add metrics support for Temporal Workflow (#4095)
adef734100 is described below
commit adef734100aa3eb076a0aeea8d202c4131543d02
Author: abhishekmjain <[email protected]>
AuthorDate: Wed Feb 5 20:27:16 2025 +0530
Add metrics support for Temporal Workflow (#4095)
---
gobblin-temporal/build.gradle | 2 +
.../temporal/GobblinTemporalConfigurationKeys.java | 10 ++
.../client/TemporalWorkflowClientFactory.java | 14 +++
.../workflows/metrics/TemporalMetricsHelper.java | 121 +++++++++++++++++++++
.../metrics/TemporalMetricsHelperTest.java | 85 +++++++++++++++
gradle/scripts/defaultBuildProperties.gradle | 4 +-
gradle/scripts/dependencyDefinitions.gradle | 2 +
7 files changed, 237 insertions(+), 1 deletion(-)
diff --git a/gobblin-temporal/build.gradle b/gobblin-temporal/build.gradle
index fa34245e91..19687ad3af 100644
--- a/gobblin-temporal/build.gradle
+++ b/gobblin-temporal/build.gradle
@@ -64,6 +64,8 @@ dependencies {
}
compile externalDependency.tdigest
compile externalDependency."temporal-sdk"
+ compile externalDependency.micrometerCore
+ compile externalDependency.micrometerRegistry
testCompile project(path: ':gobblin-cluster', configuration: 'tests')
testCompile project(":gobblin-example")
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 0201cda0fb..82ea646090 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -74,4 +74,14 @@ public interface GobblinTemporalConfigurationKeys {
String DYNAMIC_SCALING_POLLING_INTERVAL_SECS = DYNAMIC_SCALING_PREFIX +
"polling.interval.seconds";
int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+
+ /**
+ * Temporal metrics config properties
+ */
+ String TEMPORAL_METRICS_PREFIX = PREFIX + "metrics.";
+ String TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT = TEMPORAL_METRICS_PREFIX +
"otlp";
+ String TEMPORAL_METRICS_OTLP_HEADERS_KEY =
TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".headers";
+ String TEMPORAL_METRICS_REPORT_INTERVAL_SECS =
TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".report.interval.seconds";
+ int DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 10;
+ String TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY =
TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".dimensions";
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
index 030d4aae18..c8a1052ae9 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
@@ -30,6 +30,8 @@ import org.apache.commons.io.FileUtils;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.uber.m3.tally.RootScopeBuilder;
+import com.uber.m3.tally.Scope;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
@@ -41,7 +43,10 @@ import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.assistance.MDCContextPropagator;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalMetricsHelper;
+import org.apache.gobblin.util.ConfigUtils;
public class TemporalWorkflowClientFactory {
@@ -100,10 +105,19 @@ public class TemporalWorkflowClientFactory {
.ciphers(SSL_CONFIG_DEFAULT_CIPHER_SUITES)
.build();
+ // Initialize metrics
+ int reportInterval = ConfigUtils.getInt(config,
GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_REPORT_INTERVAL_SECS,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS);
+ Scope metricsScope = new RootScopeBuilder()
+ .reporter(TemporalMetricsHelper.getStatsReporter(config))
+ .tags(TemporalMetricsHelper.getDimensions(config))
+ .reportEvery(com.uber.m3.util.Duration.ofSeconds(reportInterval));
+
WorkflowServiceStubsOptions options =
WorkflowServiceStubsOptions.newBuilder()
.setTarget(connectionUri)
.setEnableHttps(true)
.setSslContext(sslContext)
+ .setMetricsScope(metricsScope)
.build();
return WorkflowServiceStubs.newServiceStubs(options);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelper.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelper.java
new file mode 100644
index 0000000000..443c5613da
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelper.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.workflows.metrics;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+import org.jetbrains.annotations.NotNull;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.typesafe.config.Config;
+import com.uber.m3.tally.StatsReporter;
+
+import io.micrometer.core.instrument.Clock;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.registry.otlp.OtlpConfig;
+import io.micrometer.registry.otlp.OtlpMeterRegistry;
+import io.temporal.common.reporter.MicrometerClientStatsReporter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class TemporalMetricsHelper {
+
+ /**
+ * Retrieves a map of dimension names and their corresponding values from
the provided config.
+ * The dimensions are defined as a comma-separated string in the config, and
the method
+ * fetches the corresponding values for each dimension.
+ * A missing dimension in config will have empty string as value.
+ *
+ * @param config Config object
+ * @return a map where the key is the dimension name and the value is the
corresponding value from the config
+ */
+ public static Map<String, String> getDimensions(Config config) {
+ String dimensionsString = ConfigUtils.getString(config,
GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY, "");
+
+ // Split the string by "," and create a map by fetching values from config
+ return Arrays.stream(dimensionsString.split(","))
+ .map(String::trim)
+ .filter(StringUtils::isNotBlank)
+ .collect(Collectors.toMap(key -> key, key ->
ConfigUtils.getString(config, key, ""), (l, r)-> r));
+ }
+
+ /**
+ * Creates and returns a {@link StatsReporter} instance configured with an
{@link OtlpMeterRegistry}.
+ * This reporter can be used to report metrics via the OpenTelemetry
Protocol (OTLP) to a metrics backend.
+ *
+ * @param config Config object
+ * @return a {@link StatsReporter} instance, configured with an OTLP meter
registry and ready to report metrics.
+ */
+ public static StatsReporter getStatsReporter(Config config) {
+ OtlpConfig otlpConfig = getOtlpConfig(config);
+ MeterRegistry meterRegistry = new OtlpMeterRegistry(otlpConfig,
Clock.SYSTEM);
+ return new MicrometerClientStatsReporter(meterRegistry);
+ }
+
+ @VisibleForTesting
+ static OtlpConfig getOtlpConfig(Config config) {
+ return new OtlpConfig() {
+ @Override
+ public String get(@NotNull String key) {
+ return ConfigUtils.getString(config, key, null);
+ }
+
+ @NotNull
+ @Override
+ public String prefix() {
+ return
GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT;
+ }
+
+ @NotNull
+ @Override
+ public Map<String, String> headers() {
+ String headers =
get(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_HEADERS_KEY);
+ return parseHeaders(headers);
+ }
+
+ @NotNull
+ @Override
+ public Duration step() {
+ int reportInterval = ConfigUtils.getInt(config,
GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_REPORT_INTERVAL_SECS,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS);
+ return Duration.ofSeconds(reportInterval);
+ }
+ };
+ }
+
+ private static Map<String, String> parseHeaders(String headersString) {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(headersString, HashMap.class);
+ } catch (Exception e) {
+ String errMsg = "Failed to parse headers: " + headersString;
+ log.error(errMsg, e);
+ throw new RuntimeException(errMsg);
+ }
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelperTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelperTest.java
new file mode 100644
index 0000000000..1435d0f4df
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelperTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.workflows.metrics;
+
+
+import io.micrometer.registry.otlp.OtlpConfig;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+
+/** Test {@link TemporalMetricsHelper} */
+public class TemporalMetricsHelperTest {
+
+ private Config config;
+
+ @BeforeClass
+ public void setup() {
+ config = ConfigFactory.empty()
+ .withValue("prefix",
ConfigValueFactory.fromAnyRef("gobblin.temporal.metrics.otlp"))
+
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT
+ ".headers",
+ ConfigValueFactory.fromAnyRef("{\"abc\":\"123\",
\"pqr\":\"456\"}"))
+
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT
+ ".resourceAttributes",
+ ConfigValueFactory.fromAnyRef("service.name=gobblin-service"))
+ .withValue("dim1", ConfigValueFactory.fromAnyRef("val1"))
+ .withValue("dim2", ConfigValueFactory.fromAnyRef("val2"))
+
.withValue(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT
+ ".dimensions",
+ ConfigValueFactory.fromAnyRef("dim1,dim2,missingDimension"));
+ }
+
+ @Test
+ public void testGetDimensions() {
+ Map<String, String> dimensions =
TemporalMetricsHelper.getDimensions(config);
+
+ Assert.assertNotNull(dimensions);
+ Assert.assertEquals(3, dimensions.size());
+ Assert.assertEquals("val1", dimensions.get("dim1"));
+ Assert.assertEquals("val2", dimensions.get("dim2"));
+ Assert.assertEquals("", dimensions.get("missingDimension"));
+ }
+
+ @Test
+ public void testGetDimensionsEmptyConfig() {
+ Map<String, String> dimensions =
TemporalMetricsHelper.getDimensions(ConfigFactory.empty());
+
+ Assert.assertNotNull(dimensions);
+ Assert.assertEquals(0, dimensions.size());
+ }
+
+ @Test
+ public void testGetOtlpConfig() {
+ OtlpConfig otlpConfig = TemporalMetricsHelper.getOtlpConfig(config);
+
+ Map<String, String> headers = otlpConfig.headers();
+ Assert.assertNotNull(headers);
+ Assert.assertEquals(2, headers.size());
+ Assert.assertEquals("123", headers.get("abc"));
+ Assert.assertEquals("456", headers.get("pqr"));
+
+ Assert.assertEquals("gobblin-service",
otlpConfig.resourceAttributes().get("service.name"));
+ }
+}
diff --git a/gradle/scripts/defaultBuildProperties.gradle
b/gradle/scripts/defaultBuildProperties.gradle
index c3ff18b543..2263dcdfa0 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -41,7 +41,8 @@ def BuildProperties BUILD_PROPERTIES = new
BuildProperties(project)
.register(new BuildProperty("publishToMaven", false, "Enable publishing of
artifacts to a central Maven repository"))
.register(new BuildProperty("publishToNexus", false, "Enable publishing of
artifacts to Nexus"))
.register(new BuildProperty("salesforceVersion", "42.0.0", "Salesforce
dependencies version"))
- .register(new BuildProperty("openTelemetryVersion", "1.29.0",
"OpenTelemetry dependencies version"))
+ .register(new BuildProperty("openTelemetryVersion", "1.30.0",
"OpenTelemetry dependencies version"))
+ .register(new BuildProperty("micrometerVersion", "1.11.1", "Micrometer
dependencies version"))
task buildProperties(description: 'Lists main properties that can be used to
customize the build') {
doLast {
BUILD_PROPERTIES.printHelp();
@@ -74,5 +75,6 @@ BUILD_PROPERTIES.ensureDefined('kafka1Version')
BUILD_PROPERTIES.ensureDefined('pegasusVersion')
BUILD_PROPERTIES.ensureDefined('salesforceVersion')
BUILD_PROPERTIES.ensureDefined('openTelemetryVersion')
+BUILD_PROPERTIES.ensureDefined('micrometerVersion')
ext.buildProperties = BUILD_PROPERTIES
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index e4b328c5e1..3d71988c14 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -120,6 +120,8 @@ ext.externalDependency = [
"opentelemetrySdk": "io.opentelemetry:opentelemetry-sdk:" +
openTelemetryVersion,
"opentelemetryExporterOtlp":
"io.opentelemetry:opentelemetry-exporter-otlp:" + openTelemetryVersion,
"opentelemetrySdkTesting": "io.opentelemetry:opentelemetry-sdk-testing:" +
openTelemetryVersion,
+ "micrometerCore": "io.micrometer:micrometer-core:" + micrometerVersion,
+ "micrometerRegistry": "io.micrometer:micrometer-registry-otlp:" +
micrometerVersion,
"jsch": "com.jcraft:jsch:0.1.54",
"jdo2": "javax.jdo:jdo2-api:2.1",
"azkaban": "com.linkedin.azkaban:azkaban:2.5.0",