This is an automated email from the ASF dual-hosted git repository.

abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 23c4481c7d [GOBBLIN-2194] Close temporal metrics scope on job 
completion (#4097)
23c4481c7d is described below

commit 23c4481c7d00bdc3fcef3e1aaad8679acca951cf
Author: abhishekmjain <[email protected]>
AuthorDate: Mon Feb 17 23:07:12 2025 +0530

    [GOBBLIN-2194] Close temporal metrics scope on job completion (#4097)
    
    Close temporal metrics scope on job completion
    Override close in GobblinTemporalJobLauncher
---
 .../cluster/GobblinTemporalTaskRunner.java         | 13 ++++--
 .../temporal/joblauncher/GobblinJobLauncher.java   |  8 +---
 .../joblauncher/GobblinTemporalJobLauncher.java    | 25 ++++++++--
 .../joblauncher/GobblinTemporalJobScheduler.java   | 21 +++++----
 .../client/TemporalWorkflowClientFactory.java      |  5 +-
 .../service/ManagedWorkflowServiceStubs.java       | 54 ++++++++++++++++++++++
 .../GobblinTemporalJobLauncherTest.java            |  4 +-
 7 files changed, 101 insertions(+), 29 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
index 65d91bab30..c8091068a9 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
@@ -49,7 +49,6 @@ import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
 import io.temporal.client.WorkflowClient;
-import io.temporal.serviceclient.WorkflowServiceStubs;
 import lombok.Getter;
 import lombok.Setter;
 
@@ -74,6 +73,7 @@ import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
 import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import 
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory;
+import 
org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.FileUtils;
@@ -126,6 +126,7 @@ public class GobblinTemporalTaskRunner implements 
StandardMetricsBridge {
   private final boolean isMetricReportingFailureFatal;
   private final boolean isEventReportingFailureFatal;
   private final List<TemporalWorker> workers;
+  private final ManagedWorkflowServiceStubs managedWorkflowServiceStubs;
 
   public GobblinTemporalTaskRunner(String applicationName,
       String applicationId,
@@ -163,6 +164,9 @@ public class GobblinTemporalTaskRunner implements 
StandardMetricsBridge {
         ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
     this.workers = new ArrayList<>();
 
+    String connectionUri = 
clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
+    this.managedWorkflowServiceStubs = 
TemporalWorkflowClientFactory.createServiceInstance(connectionUri);
+
     logger.info("GobblinTaskRunner({}): applicationName {}, applicationId {}, 
taskRunnerId {}, config {}, appWorkDir {}",
         this.isTaskDriver ? "taskDriver" : "worker",
         applicationName,
@@ -241,12 +245,10 @@ public class GobblinTemporalTaskRunner implements 
StandardMetricsBridge {
   private TemporalWorker initiateWorker() throws Exception {
     logger.info("Starting Temporal Worker");
 
-    String connectionUri = 
clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
-    WorkflowServiceStubs service = 
TemporalWorkflowClientFactory.createServiceInstance(connectionUri);
-
     String namespace = ConfigUtils.getString(clusterConfig, 
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE,
             
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
-    WorkflowClient client = 
TemporalWorkflowClientFactory.createClientInstance(service, namespace);
+    WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(
+        managedWorkflowServiceStubs.getWorkflowServiceStubs(), namespace);
 
     String workerClassName = ConfigUtils.getString(clusterConfig,
         GobblinTemporalConfigurationKeys.WORKER_CLASS, 
GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
@@ -293,6 +295,7 @@ public class GobblinTemporalTaskRunner implements 
StandardMetricsBridge {
     }
 
     workers.forEach(TemporalWorker::shutdown);
+    managedWorkflowServiceStubs.close();
 
     logger.info("All services are stopped.");
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
index c358b368a3..ef0e9de0bc 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
@@ -137,13 +137,9 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
   @Override
   public void close() throws IOException {
     try {
-      executeCancellation();
+      cleanupWorkingDirectory();
     } finally {
-      try {
-        cleanupWorkingDirectory();
-      } finally {
-        super.close();
-      }
+      super.close();
     }
   }
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
index 2d17fe20a3..26feccee1f 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.temporal.joblauncher;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
@@ -33,7 +34,6 @@ import 
io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
 import io.temporal.client.WorkflowClient;
 import io.temporal.client.WorkflowFailedException;
 import io.temporal.client.WorkflowStub;
-import io.temporal.serviceclient.WorkflowServiceStubs;
 import io.temporal.workflow.Workflow;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -48,6 +48,7 @@ import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
 import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import 
org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
 import org.apache.gobblin.util.ConfigUtils;
 
 import static org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys.*;
@@ -74,7 +75,7 @@ public abstract class GobblinTemporalJobLauncher extends 
GobblinJobLauncher {
   private static final Logger log = 
Workflow.getLogger(GobblinTemporalJobLauncher.class);
   private static final int TERMINATION_TIMEOUT_SECONDS = 3;
 
-  protected WorkflowServiceStubs workflowServiceStubs;
+  protected ManagedWorkflowServiceStubs managedWorkflowServiceStubs;
   protected WorkflowClient client;
   protected String queueName;
   protected String namespace;
@@ -87,10 +88,10 @@ public abstract class GobblinTemporalJobLauncher extends 
GobblinJobLauncher {
     log.info("GobblinTemporalJobLauncher: appWorkDir {}; jobProps {}", 
appWorkDir, jobProps);
 
     String connectionUri = jobProps.getProperty(TEMPORAL_CONNECTION_STRING);
-    this.workflowServiceStubs = createServiceInstance(connectionUri);
+    this.managedWorkflowServiceStubs = createServiceInstance(connectionUri);
 
     this.namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE, 
DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
-    this.client = createClientInstance(workflowServiceStubs, namespace);
+    this.client = 
createClientInstance(managedWorkflowServiceStubs.getWorkflowServiceStubs(), 
namespace);
 
     this.queueName = jobProps.getProperty(GOBBLIN_TEMPORAL_TASK_QUEUE, 
DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);
 
@@ -139,7 +140,8 @@ public abstract class GobblinTemporalJobLauncher extends 
GobblinJobLauncher {
           .setNamespace(this.namespace)
           .setExecution(workflowStub.getExecution())
           .build();
-      DescribeWorkflowExecutionResponse response = 
workflowServiceStubs.blockingStub().describeWorkflowExecution(request);
+      DescribeWorkflowExecutionResponse response = 
managedWorkflowServiceStubs.getWorkflowServiceStubs()
+          .blockingStub().describeWorkflowExecution(request);
 
       WorkflowExecutionStatus status;
       try {
@@ -188,4 +190,17 @@ public abstract class GobblinTemporalJobLauncher extends 
GobblinJobLauncher {
   protected void addTasksToCurrentJob(List<WorkUnit> workUnitsToAdd) {
     log.warn("NOT IMPLEMENTED: Temporal addTasksToCurrentJob");
   }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      // Calling cancel before calling close on serviceStubs as it will 
shutdown the service which is required during cancellation.
+      cancelJob(jobListener);
+    } catch (Exception e) {
+      log.error("Exception occurred while cancelling job", e);
+    } finally {
+      managedWorkflowServiceStubs.close();
+      super.close();
+    }
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
index 34a1cec4dc..fe589e18d1 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
@@ -209,16 +209,17 @@ public class GobblinTemporalJobScheduler extends 
JobScheduler implements Standar
       } else {
         LOGGER.info("No job schedule found, so running job " + jobUri);
         GobblinTemporalJobLauncherListener listener = new 
GobblinTemporalJobLauncherListener(this.launcherMetrics);
-        JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig());
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-          try {
-            launcher.cancelJob(listener);
-          } catch (JobException e) {
-            LOGGER.error("Failed to cancel the job during shutdown", e);
-            throw new RuntimeException(e);
-          }
-        }));
-        launcher.launchJob(listener);
+        try (JobLauncher launcher = 
buildJobLauncher(newJobArrival.getJobConfig())) {
+          Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+              launcher.cancelJob(listener);
+            } catch (JobException e) {
+              LOGGER.error("Failed to cancel the job during shutdown", e);
+              throw new RuntimeException(e);
+            }
+          }));
+          launcher.launchJob(listener);
+        }
       }
     } catch (Exception je) {
       LOGGER.error("Failed to schedule or run job " + jobUri, je);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
index c8a1052ae9..ec63c79014 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
@@ -46,11 +46,12 @@ import org.apache.gobblin.cluster.GobblinClusterUtils;
 import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.ddm.work.assistance.MDCContextPropagator;
 import org.apache.gobblin.temporal.workflows.metrics.TemporalMetricsHelper;
+import 
org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
 import org.apache.gobblin.util.ConfigUtils;
 
 
 public class TemporalWorkflowClientFactory {
-    public static WorkflowServiceStubs createServiceInstance(String 
connectionUri) throws Exception {
+    public static ManagedWorkflowServiceStubs createServiceInstance(String 
connectionUri) throws Exception {
         GobblinClusterUtils.setSystemProperties(ConfigFactory.load());
         Config config = 
GobblinClusterUtils.addDynamicConfig(ConfigFactory.load());
         String SHARED_KAFKA_CONFIG_PREFIX_WITH_DOT = 
"gobblin.kafka.sharedConfig.";
@@ -119,7 +120,7 @@ public class TemporalWorkflowClientFactory {
                 .setSslContext(sslContext)
                 .setMetricsScope(metricsScope)
                 .build();
-        return WorkflowServiceStubs.newServiceStubs(options);
+        return new 
ManagedWorkflowServiceStubs(WorkflowServiceStubs.newServiceStubs(options));
     }
 
     public static WorkflowClient createClientInstance(WorkflowServiceStubs 
service, String namespace) {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/service/ManagedWorkflowServiceStubs.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/service/ManagedWorkflowServiceStubs.java
new file mode 100644
index 0000000000..e06c6042b6
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/service/ManagedWorkflowServiceStubs.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.workflows.service;
+
+import java.io.Closeable;
+
+import io.temporal.serviceclient.WorkflowServiceStubs;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * A wrapper class of {@link WorkflowServiceStubs} that implements the 
Closeable interface.
+ * It manages the lifecycle of {@link WorkflowServiceStubs}, ensuring proper 
shutdown and resource cleanup.
+ */
+@Getter
+@Slf4j
+public class ManagedWorkflowServiceStubs implements Closeable {
+  private final WorkflowServiceStubs workflowServiceStubs;
+
+  public ManagedWorkflowServiceStubs(WorkflowServiceStubs serviceStubs) {
+    this.workflowServiceStubs = serviceStubs;
+  }
+
+  @Override
+  public void close() {
+    try {
+      workflowServiceStubs.getOptions().getMetricsScope().close();
+    }
+    catch (Exception e) {
+      log.error("Exception occurred while closing metrics scope", e);
+    }
+    try {
+      workflowServiceStubs.shutdown();
+    }
+    catch (Exception e) {
+      log.error("Exception occurred while shutting down WorkflowServiceStubs", 
e);
+    }
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
index 98d0379b65..9a6446d95c 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java
@@ -49,6 +49,7 @@ import org.apache.gobblin.runtime.locks.FileBasedJobLock;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import 
org.apache.gobblin.temporal.workflows.client.TemporalWorkflowClientFactory;
+import 
org.apache.gobblin.temporal.workflows.service.ManagedWorkflowServiceStubs;
 import org.apache.gobblin.util.JobLauncherUtils;
 
 import static org.mockito.Mockito.mock;
@@ -83,6 +84,7 @@ public class GobblinTemporalJobLauncherTest {
   @BeforeClass
   public void setUp() throws Exception {
     mockServiceStubs = mock(WorkflowServiceStubs.class);
+    ManagedWorkflowServiceStubs managedWorkflowServiceStubs = new 
ManagedWorkflowServiceStubs(mockServiceStubs);
     mockClient = mock(WorkflowClient.class);
     mockExecutionInfo = mock(WorkflowExecutionInfo.class);
     DescribeWorkflowExecutionResponse mockResponse = 
mock(DescribeWorkflowExecutionResponse.class);
@@ -93,7 +95,7 @@ public class GobblinTemporalJobLauncherTest {
 
     mockWorkflowClientFactory = 
Mockito.mockStatic(TemporalWorkflowClientFactory.class);
     mockWorkflowClientFactory.when(() -> 
TemporalWorkflowClientFactory.createServiceInstance(Mockito.anyString()))
-        .thenReturn(mockServiceStubs);
+        .thenReturn(managedWorkflowServiceStubs);
     mockWorkflowClientFactory.when(() -> 
TemporalWorkflowClientFactory.createClientInstance(Mockito.any(), 
Mockito.anyString()))
         .thenReturn(mockClient);
 

Reply via email to