This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 23c4481c7d [GOBBLIN-2194] Close temporal metrics scope on job
completion (#4097)
23c4481c7d is described below
commit 23c4481c7d00bdc3fcef3e1aaad8679acca951cf
Author: abhishekmjain <[email protected]>
AuthorDate: Mon Feb 17 23:07:12 2025 +0530
[GOBBLIN-2194] Close temporal metrics scope on job completion (#4097)
Close temporal metrics scope on job completion
Override close in GobblinTemporalJobLauncher
---
.../cluster/GobblinTemporalTaskRunner.java | 13 ++++--
.../temporal/joblauncher/GobblinJobLauncher.java | 8 +---
.../joblauncher/GobblinTemporalJobLauncher.java | 25 ++++++++--
.../joblauncher/GobblinTemporalJobScheduler.java | 21 +++++----
.../client/TemporalWorkflowClientFactory.java | 5 +-
.../service/ManagedWorkflowServiceStubs.java | 54 ++++++++++++++++++++++
.../GobblinTemporalJobLauncherTest.java | 4 +-
7 files changed, 101 insertions(+), 29 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
index 65d91bab30..c8091068a9 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
@@ -49,7 +49,6 @@ import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import io.temporal.client.WorkflowClient;
-import io.temporal.serviceclient.WorkflowServiceStubs;
import lombok.Getter;
import lombok.Setter;
@@ -74,6 +73,7 @@ import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory;
+import
org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.FileUtils;
@@ -126,6 +126,7 @@ public class GobblinTemporalTaskRunner implements
StandardMetricsBridge {
private final boolean isMetricReportingFailureFatal;
private final boolean isEventReportingFailureFatal;
private final List<TemporalWorker> workers;
+ private final ManagedWorkflowServiceStubs managedWorkflowServiceStubs;
public GobblinTemporalTaskRunner(String applicationName,
String applicationId,
@@ -163,6 +164,9 @@ public class GobblinTemporalTaskRunner implements
StandardMetricsBridge {
ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
this.workers = new ArrayList<>();
+ String connectionUri =
clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
+ this.managedWorkflowServiceStubs =
TemporalWorkflowClientFactory.createServiceInstance(connectionUri);
+
logger.info("GobblinTaskRunner({}): applicationName {}, applicationId {},
taskRunnerId {}, config {}, appWorkDir {}",
this.isTaskDriver ? "taskDriver" : "worker",
applicationName,
@@ -241,12 +245,10 @@ public class GobblinTemporalTaskRunner implements
StandardMetricsBridge {
private TemporalWorker initiateWorker() throws Exception {
logger.info("Starting Temporal Worker");
- String connectionUri =
clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
- WorkflowServiceStubs service =
TemporalWorkflowClientFactory.createServiceInstance(connectionUri);
-
String namespace = ConfigUtils.getString(clusterConfig,
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE,
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
- WorkflowClient client =
TemporalWorkflowClientFactory.createClientInstance(service, namespace);
+ WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(
+ managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace);
String workerClassName = ConfigUtils.getString(clusterConfig,
GobblinTemporalConfigurationKeys.WORKER_CLASS,
GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
@@ -293,6 +295,7 @@ public class GobblinTemporalTaskRunner implements
StandardMetricsBridge {
}
workers.forEach(TemporalWorker::shutdown);
+ managedWorkflowServiceStubs.close();
logger.info("All services are stopped.");
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
index c358b368a3..ef0e9de0bc 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
@@ -137,13 +137,9 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
@Override
public void close() throws IOException {
try {
- executeCancellation();
+ cleanupWorkingDirectory();
} finally {
- try {
- cleanupWorkingDirectory();
- } finally {
- super.close();
- }
+ super.close();
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
index 2d17fe20a3..26feccee1f 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.temporal.joblauncher;
+import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@@ -33,7 +34,6 @@ import
io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowFailedException;
import io.temporal.client.WorkflowStub;
-import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.workflow.Workflow;
import org.apache.hadoop.fs.FileSystem;
@@ -48,6 +48,7 @@ import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import
org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
import org.apache.gobblin.util.ConfigUtils;
import static org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.*;
@@ -74,7 +75,7 @@ public abstract class GobblinTemporalJobLauncher extends
GobblinJobLauncher {
private static final Logger log =
Workflow.getLogger(GobblinTemporalJobLauncher.class);
private static final int TERMINATION_TIMEOUT_SECONDS = 3;
- protected WorkflowServiceStubs workflowServiceStubs;
+ protected ManagedWorkflowServiceStubs managedWorkflowServiceStubs;
protected WorkflowClient client;
protected String queueName;
protected String namespace;
@@ -87,10 +88,10 @@ public abstract class GobblinTemporalJobLauncher extends
GobblinJobLauncher {
log.info("GobblinTemporalJobLauncher: appWorkDir {}; jobProps {}",
appWorkDir, jobProps);
String connectionUri = jobProps.getProperty(TEMPORAL_CONNECTION_STRING);
- this.workflowServiceStubs = createServiceInstance(connectionUri);
+ this.managedWorkflowServiceStubs = createServiceInstance(connectionUri);
this.namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE,
DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
- this.client = createClientInstance(workflowServiceStubs, namespace);
+ this.client =
createClientInstance(managedWorkflowServiceStubs.getWorkflowServiceStubs(),
namespace);
this.queueName = jobProps.getProperty(GOBBLIN_TEMPORAL_TASK_QUEUE,
DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);
@@ -139,7 +140,8 @@ public abstract class GobblinTemporalJobLauncher extends
GobblinJobLauncher {
.setNamespace(this.namespace)
.setExecution(workflowStub.getExecution())
.build();
- DescribeWorkflowExecutionResponse response =
workflowServiceStubs.blockingStub().describeWorkflowExecution(request);
+ DescribeWorkflowExecutionResponse response =
managedWorkflowServiceStubs.getWorkflowServiceStubs()
+ .blockingStub().describeWorkflowExecution(request);
WorkflowExecutionStatus status;
try {
@@ -188,4 +190,17 @@ public abstract class GobblinTemporalJobLauncher extends
GobblinJobLauncher {
protected void addTasksToCurrentJob(List<WorkUnit> workUnitsToAdd) {
log.warn("NOT IMPLEMENTED: Temporal addTasksToCurrentJob");
}
+
+ @Override
+ public void close() throws IOException {
+ try {
+ // Calling cancel before calling close on serviceStubs as it will
shutdown the service which is required during cancellation.
+ cancelJob(jobListener);
+ } catch (Exception e) {
+ log.error("Exception occurred while cancelling job", e);
+ } finally {
+ managedWorkflowServiceStubs.close();
+ super.close();
+ }
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
index 34a1cec4dc..fe589e18d1 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
@@ -209,16 +209,17 @@ public class GobblinTemporalJobScheduler extends
JobScheduler implements Standar
} else {
LOGGER.info("No job schedule found, so running job " + jobUri);
GobblinTemporalJobLauncherListener listener = new
GobblinTemporalJobLauncherListener(this.launcherMetrics);
- JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig());
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- launcher.cancelJob(listener);
- } catch (JobException e) {
- LOGGER.error("Failed to cancel the job during shutdown", e);
- throw new RuntimeException(e);
- }
- }));
- launcher.launchJob(listener);
+ try (JobLauncher launcher =
buildJobLauncher(newJobArrival.getJobConfig())) {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ launcher.cancelJob(listener);
+ } catch (JobException e) {
+ LOGGER.error("Failed to cancel the job during shutdown", e);
+ throw new RuntimeException(e);
+ }
+ }));
+ launcher.launchJob(listener);
+ }
}
} catch (Exception je) {
LOGGER.error("Failed to schedule or run job " + jobUri, je);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
index c8a1052ae9..ec63c79014 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
@@ -46,11 +46,12 @@ import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.assistance.MDCContextPropagator;
import org.apache.gobblin.temporal.workflows.metrics.TemporalMetricsHelper;
+import
org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
import org.apache.gobblin.util.ConfigUtils;
public class TemporalWorkflowClientFactory {
- public static WorkflowServiceStubs createServiceInstance(String
connectionUri) throws Exception {
+ public static ManagedWorkflowServiceStubs createServiceInstance(String
connectionUri) throws Exception {
GobblinClusterUtils.setSystemProperties(ConfigFactory.load());
Config config =
GobblinClusterUtils.addDynamicConfig(ConfigFactory.load());
String SHARED_KAFKA_CONFIG_PREFIX_WITH_DOT =
"gobblin.kafka.sharedConfig.";
@@ -119,7 +120,7 @@ public class TemporalWorkflowClientFactory {
.setSslContext(sslContext)
.setMetricsScope(metricsScope)
.build();
- return WorkflowServiceStubs.newServiceStubs(options);
+ return new
ManagedWorkflowServiceStubs(WorkflowServiceStubs.newServiceStubs(options));
}
public static WorkflowClient createClientInstance(WorkflowServiceStubs
service, String namespace) {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/service/ManagedWorkflowServiceStubs.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/service/ManagedWorkflowServiceStubs.java
new file mode 100644
index 0000000000..e06c6042b6
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/service/ManagedWorkflowServiceStubs.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.workflows.service;
+
+import java.io.Closeable;
+
+import io.temporal.serviceclient.WorkflowServiceStubs;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * A wrapper class of {@link WorkflowServiceStubs} that implements the
Closeable interface.
+ * It manages the lifecycle of {@link WorkflowServiceStubs}, ensuring proper
shutdown and resource cleanup.
+ */
+@Getter
+@Slf4j
+public class ManagedWorkflowServiceStubs implements Closeable {
+ private final WorkflowServiceStubs workflowServiceStubs;
+
+ public ManagedWorkflowServiceStubs(WorkflowServiceStubs serviceStubs) {
+ this.workflowServiceStubs = serviceStubs;
+ }
+
+ @Override
+ public void close() {
+ try {
+ workflowServiceStubs.getOptions().getMetricsScope().close();
+ }
+ catch (Exception e) {
+ log.error("Exception occurred while closing metrics scope", e);
+ }
+ try {
+ workflowServiceStubs.shutdown();
+ }
+ catch (Exception e) {
+ log.error("Exception occurred while shutting down WorkflowServiceStubs",
e);
+ }
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
index 98d0379b65..9a6446d95c 100644
---
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
@@ -49,6 +49,7 @@ import org.apache.gobblin.runtime.locks.FileBasedJobLock;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory;
+import
org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
import org.apache.gobblin.util.JobLauncherUtils;
import static org.mockito.Mockito.mock;
@@ -83,6 +84,7 @@ public class GobblinTemporalJobLauncherTest {
@BeforeClass
public void setUp() throws Exception {
mockServiceStubs = mock(WorkflowServiceStubs.class);
+ ManagedWorkflowServiceStubs managedWorkflowServiceStubs = new
ManagedWorkflowServiceStubs(mockServiceStubs);
mockClient = mock(WorkflowClient.class);
mockExecutionInfo = mock(WorkflowExecutionInfo.class);
DescribeWorkflowExecutionResponse mockResponse =
mock(DescribeWorkflowExecutionResponse.class);
@@ -93,7 +95,7 @@ public class GobblinTemporalJobLauncherTest {
mockWorkflowClientFactory =
Mockito.mockStatic(TemporalWorkflowClientFactory.class);
mockWorkflowClientFactory.when(() ->
TemporalWorkflowClientFactory.createServiceInstance(Mockito.anyString()))
- .thenReturn(mockServiceStubs);
+ .thenReturn(managedWorkflowServiceStubs);
mockWorkflowClientFactory.when(() ->
TemporalWorkflowClientFactory.createClientInstance(Mockito.any(),
Mockito.anyString()))
.thenReturn(mockClient);