This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 64bdc7007 [GOBBLIN-1996] Add ability for Yarn app to terminate on 
finishing of temporal flow (#3865)
64bdc7007 is described below

commit 64bdc7007f16f8b87cfe5aa1b2e263e8c8573bf1
Author: Matthew Ho <[email protected]>
AuthorDate: Tue Feb 13 11:48:15 2024 -0800

    [GOBBLIN-1996] Add ability for Yarn app to terminate on finishing of 
temporal flow (#3865)
    
    Terminate on finish of temporal flow
---
 .../runtime/app/ServiceBasedAppLauncher.java       | 27 +++++-
 .../cluster/GobblinTemporalClusterManager.java     | 39 ++-------
 .../ServiceBasedAppLauncherWithoutMetrics.java     | 39 +++++++++
 .../temporal/ddm/NoWorkUnitsInfiniteSource.java    | 97 ++++++++++++++++++++++
 .../ddm/launcher/ProcessWorkUnitsJobLauncher.java  |  6 +-
 .../temporal/joblauncher/GobblinJobLauncher.java   | 33 ++++----
 .../joblauncher/GobblinTemporalJobLauncher.java    | 17 +++-
 .../joblauncher/GobblinTemporalJobScheduler.java   |  3 +-
 .../launcher/GenArbitraryLoadJobLauncher.java      | 13 ++-
 .../helloworld/HelloWorldJobLauncher.java          |  6 +-
 .../workflows/metrics/TemporalEventTimer.java      |  4 +-
 .../apache/gobblin/temporal/yarn/YarnService.java  | 20 +++--
 12 files changed, 236 insertions(+), 68 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/app/ServiceBasedAppLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/app/ServiceBasedAppLauncher.java
index 06f8113c0..b31c00657 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/app/ServiceBasedAppLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/app/ServiceBasedAppLauncher.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
@@ -44,6 +46,7 @@ import 
org.apache.gobblin.runtime.services.JMXReportingService;
 import org.apache.gobblin.runtime.services.MetricsReportingService;
 import org.apache.gobblin.util.ApplicationLauncherUtils;
 import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.PropertiesUtils;
 
 
 /**
@@ -76,6 +79,13 @@ public class ServiceBasedAppLauncher implements 
ApplicationLauncher {
    */
   public static final String APP_NAME = "app.name";
 
+  /**
+   * The number of seconds before timing out when waiting for all the services 
that were added to launcher to the start. The default value is {@link 
#DEFAULT_STARTUP_TIMEOUT}
+   * NOTE: A timeout does not cause the cluster manager to kill the 
application, it only logs a warning and proceeds.
+   */
+  public static final String STARTUP_TIMEOUT_SECONDS = 
"app.start.waitForServicesTimeout.seconds";
+  public static final Duration DEFAULT_STARTUP_TIMEOUT = 
ChronoUnit.FOREVER.getDuration();
+
   /**
    * The number of seconds to wait for the application to stop, the default 
value is {@link #DEFAULT_APP_STOP_TIME_SECONDS}
    */
@@ -91,6 +101,7 @@ public class ServiceBasedAppLauncher implements 
ApplicationLauncher {
   private static final Logger LOG = 
LoggerFactory.getLogger(ServiceBasedAppLauncher.class);
 
   private final int stopTime;
+  private final Duration startupTimeout;
   private final String appId;
   private final List<Service> services;
 
@@ -101,6 +112,9 @@ public class ServiceBasedAppLauncher implements 
ApplicationLauncher {
 
   public ServiceBasedAppLauncher(Properties properties, String appName) throws 
Exception {
     this.stopTime = 
Integer.parseInt(properties.getProperty(APP_STOP_TIME_SECONDS, 
DEFAULT_APP_STOP_TIME_SECONDS));
+    this.startupTimeout = properties.containsKey(STARTUP_TIMEOUT_SECONDS)
+        ? Duration.ofSeconds(PropertiesUtils.getPropAsInt(properties, 
STARTUP_TIMEOUT_SECONDS, 0))
+        : DEFAULT_STARTUP_TIMEOUT;
     this.appId = ApplicationLauncherUtils.newAppId(appName);
     this.services = new ArrayList<>();
 
@@ -163,9 +177,18 @@ public class ServiceBasedAppLauncher implements 
ApplicationLauncher {
     });
 
     LOG.info("Starting the Gobblin application and all its associated 
Services");
+    for (Service service : this.services) {
+      LOG.info("Starting service " + service.getClass().getSimpleName());
+    }
 
     // Start the application
-    this.serviceManager.startAsync().awaitHealthy();
+    try {
+      
this.serviceManager.startAsync().awaitHealthy(startupTimeout.getSeconds(), 
TimeUnit.SECONDS);
+    } catch (TimeoutException te) {
+      LOG.error("Timeout of {} seconds exceeded for starting services in 
service manager. Proceeding anyway to unblock shutdown hook",
+          startupTimeout.getSeconds(), te);
+    }
+    LOG.info("Finished starting all services");
   }
 
   /**
@@ -250,7 +273,7 @@ public class ServiceBasedAppLauncher implements 
ApplicationLauncher {
     }
   }
 
-  private void addMetricsService(Properties properties) {
+  protected void addMetricsService(Properties properties) {
     if (GobblinMetrics.isEnabled(properties)) {
       addService(new MetricsReportingService(properties, this.appId));
     }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
index 49a91a9a7..a460bb420 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
@@ -33,8 +33,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
@@ -81,8 +79,6 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 @Slf4j
 public class GobblinTemporalClusterManager implements ApplicationLauncher, 
StandardMetricsBridge, LeadershipChangeAwareComponent {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinTemporalClusterManager.class);
-
   private StopStatus stopStatus = new StopStatus(false);
 
   protected ServiceBasedAppLauncher applicationLauncher;
@@ -98,14 +94,6 @@ public class GobblinTemporalClusterManager implements 
ApplicationLauncher, Stand
 
   protected final String applicationId;
 
-  // thread used to keep process up for an idle controller
-  private Thread idleProcessThread;
-
-  // set to true to stop the idle process thread
-  private volatile boolean stopIdleProcessThread = false;
-
-  private final boolean isStandaloneMode;
-
   @Getter
   private MutableJobCatalog jobCatalog;
   @Getter
@@ -129,15 +117,12 @@ public class GobblinTemporalClusterManager implements 
ApplicationLauncher, Stand
     this.config = GobblinClusterUtils.addDynamicConfig(sysConfig);
 
     this.clusterName = clusterName;
-    this.isStandaloneMode = ConfigUtils.getBoolean(this.config, 
GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY,
-        GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE);
-
     this.applicationId = applicationId;
 
     this.fs = GobblinClusterUtils.buildFileSystem(this.config, new 
Configuration());
     this.appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get()
         : GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, 
this.fs, clusterName, applicationId);
-    LOGGER.info("Configured GobblinTemporalClusterManager work dir to: {}", 
this.appWorkDir);
+    log.info("Configured GobblinTemporalClusterManager work dir to: {}", 
this.appWorkDir);
 
     initializeAppLauncherAndServices();
   }
@@ -152,7 +137,7 @@ public class GobblinTemporalClusterManager implements 
ApplicationLauncher, Stand
     if (!properties.contains(ServiceBasedAppLauncher.APP_STOP_TIME_SECONDS)) {
       properties.setProperty(ServiceBasedAppLauncher.APP_STOP_TIME_SECONDS, 
Long.toString(300));
     }
-    this.applicationLauncher = new ServiceBasedAppLauncher(properties, 
this.clusterName);
+    this.applicationLauncher = new 
ServiceBasedAppLauncherWithoutMetrics(properties, this.clusterName);
 
     // create a job catalog for keeping track of received jobs if a job config 
path is specified
     if 
(this.config.hasPath(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX
@@ -201,14 +186,17 @@ public class GobblinTemporalClusterManager implements 
ApplicationLauncher, Stand
    */
   private void stopAppLauncherAndServices() {
     try {
+      log.info("Stopping the Gobblin cluster application launcher");
       this.applicationLauncher.stop();
     } catch (ApplicationException ae) {
-      LOGGER.error("Error while stopping Gobblin Cluster application 
launcher", ae);
+      log.error("Error while stopping Gobblin Cluster application launcher", 
ae);
     }
 
+    log.info("Stopping the Gobblin cluster job catalog");
     if (this.jobCatalog instanceof Service) {
       ((Service) this.jobCatalog).stopAsync().awaitTerminated();
     }
+    log.info("Stopped the Gobblin cluster job catalog");
   }
 
 
@@ -218,7 +206,7 @@ public class GobblinTemporalClusterManager implements 
ApplicationLauncher, Stand
   @Override
   public void start() {
     // temporal workflow
-    LOGGER.info("Starting the Gobblin Temporal Cluster Manager");
+    log.info("Starting the Gobblin Temporal Cluster Manager");
 
     this.eventBus.register(this);
     startAppLauncherAndServices();
@@ -236,18 +224,9 @@ public class GobblinTemporalClusterManager implements 
ApplicationLauncher, Stand
 
     this.stopStatus.setStopInprogress(true);
 
-    LOGGER.info("Stopping the Gobblin Cluster Manager");
-
-    if (this.idleProcessThread != null) {
-      try {
-        this.idleProcessThread.join();
-      } catch (InterruptedException ie) {
-        Thread.currentThread().interrupt();
-      }
-    }
+    log.info("Stopping the Gobblin Cluster Manager");
 
     stopAppLauncherAndServices();
-
   }
 
   private GobblinTemporalJobScheduler buildGobblinTemporalJobScheduler(Config 
sysConfig, Path appWorkDir,
@@ -338,7 +317,7 @@ public class GobblinTemporalClusterManager implements 
ApplicationLauncher, Stand
         isStandaloneClusterManager = 
Boolean.parseBoolean(cmd.getOptionValue(GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE,
 "false"));
       }
 
-      LOGGER.info(JvmUtils.getJvmInputArguments());
+      log.info(JvmUtils.getJvmInputArguments());
       Config config = ConfigFactory.load();
 
       if 
(cmd.hasOption(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME))
 {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/ServiceBasedAppLauncherWithoutMetrics.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/ServiceBasedAppLauncherWithoutMetrics.java
new file mode 100644
index 000000000..89e296080
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/ServiceBasedAppLauncherWithoutMetrics.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.cluster;
+
+import java.util.Properties;
+
+import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
+
+
+/**
+ * {@link ServiceBasedAppLauncher} that does not add metrics service. This is 
different from deactivating the flag
+ * {@link 
org.apache.gobblin.configuration.ConfigurationKeys#METRICS_ENABLED_KEY} because 
this will only not initialize
+ * the metrics service in the cluster manager but still allow for {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent}
+ */
+public class ServiceBasedAppLauncherWithoutMetrics extends 
ServiceBasedAppLauncher {
+  public ServiceBasedAppLauncherWithoutMetrics(Properties properties, String 
appName)
+      throws Exception {
+    super(properties, appName);
+  }
+
+  @Override
+  protected void addMetricsService(Properties properties) {
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/NoWorkUnitsInfiniteSource.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/NoWorkUnitsInfiniteSource.java
new file mode 100644
index 000000000..4f2b393a3
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/NoWorkUnitsInfiniteSource.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.eventbus.EventBus;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.instrumented.extractor.InstrumentedExtractor;
+import org.apache.gobblin.source.InfiniteSource;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.stream.RecordEnvelope;
+
+
+/**
+ * An implementation of {@link InfiniteSource} that does not generate any 
workunits. This is helpful when the {@link io.temporal.workflow.Workflow}
+ * is driven by the job launcher and not the source. I.e. we want the 
discovery to be triggered on a {@link io.temporal.worker.Worker} and not the
+ * {@link org.apache.gobblin.temporal.cluster.GobblinTemporalClusterManager}
+ *
+ * This class also implements the {@link InfiniteSource} to provide hooks for 
communicating with the
+ * {@link org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher} 
via the {@link EventBus}.
+ */
+@Slf4j
+public class NoWorkUnitsInfiniteSource implements InfiniteSource {
+
+  private final EventBus eventBus = new 
EventBus(this.getClass().getSimpleName());
+
+  @Override
+  public List<WorkUnit> getWorkunits(SourceState state) {
+    return Arrays.asList(WorkUnit.createEmpty());
+  }
+
+  @Override
+  public Extractor getExtractor(WorkUnitState state)
+      throws IOException {
+    // no-op stub extractor
+    return new InstrumentedExtractor(state) {
+      @Override
+      public Object getSchema()
+          throws IOException {
+        return null;
+      }
+
+      @Override
+      public long getExpectedRecordCount() {
+        return 0;
+      }
+
+      @Override
+      public long getHighWatermark() {
+        return 0;
+      }
+
+      @Override
+      protected RecordEnvelope readRecordEnvelopeImpl() throws 
DataRecordException, IOException {
+        return null;
+      }
+    };
+  }
+
+  @Override
+  public void shutdown(SourceState state) {
+  }
+
+  @Override
+  public boolean isEarlyStopped() {
+    return InfiniteSource.super.isEarlyStopped();
+  }
+
+  @Override
+  public EventBus getEventBus() {
+    return this.eventBus;
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index f7040f79f..ee6fe7f80 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.fs.Path;
 
+import com.google.common.eventbus.EventBus;
 import com.typesafe.config.ConfigFactory;
 
 import io.temporal.client.WorkflowOptions;
@@ -69,9 +70,10 @@ public class ProcessWorkUnitsJobLauncher extends 
GobblinTemporalJobLauncher {
       Properties jobProps,
       Path appWorkDir,
       List<? extends Tag<?>> metadataTags,
-      ConcurrentHashMap<String, Boolean> runningMap
+      ConcurrentHashMap<String, Boolean> runningMap,
+      EventBus eventBus
   ) throws Exception {
-    super(jobProps, appWorkDir, metadataTags, runningMap);
+    super(jobProps, appWorkDir, metadataTags, runningMap, eventBus);
   }
 
   @Override
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
index 9ff8b6bea..c61d8c72a 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
@@ -17,14 +17,21 @@
 
 package org.apache.gobblin.temporal.joblauncher;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
 import javax.annotation.Nullable;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -37,7 +44,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.CountEventBuilder;
 import org.apache.gobblin.metrics.event.JobEvent;
-import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.rest.LauncherTypeEnum;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.JobException;
@@ -50,10 +56,6 @@ import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ParallelRunner;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 /**
  * An implementation of {@link JobLauncher} that launches a Gobblin job using 
the Temporal task framework.
  * Most of this code is lifted from {@link 
org.apache.gobblin.cluster.GobblinHelixJobLauncher} and maybe in the future
@@ -69,6 +71,7 @@ import org.apache.hadoop.fs.Path;
 @Alpha
 @Slf4j
 public abstract class GobblinJobLauncher extends AbstractJobLauncher {
+  protected final EventBus eventBus;
   protected static final String WORK_UNIT_FILE_EXTENSION = ".wu";
   protected final FileSystem fs;
   protected final Path appWorkDir;
@@ -87,7 +90,7 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
 
 
   public GobblinJobLauncher(Properties jobProps, Path appWorkDir,
-      List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> 
runningMap)
+      List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> 
runningMap, EventBus eventbus)
       throws Exception {
     super(jobProps, HelixUtils.initBaseEventTags(jobProps, metadataTags));
     log.debug("GobblinJobLauncher: jobProps {}, appWorkDir {}", jobProps, 
appWorkDir);
@@ -101,6 +104,7 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
 
     this.stateSerDeRunnerThreads = 
Integer.parseInt(jobProps.getProperty(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY,
         Integer.toString(ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS)));
+    this.eventBus = eventbus;
 
     Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps)
         .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, 
ConfigValueFactory.fromAnyRef(
@@ -146,13 +150,9 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
       // Start the output TaskState collector service
       this.taskStateCollectorService.startAsync().awaitRunning();
 
-      TimingEvent jobSubmissionTimer =
-          
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.HELIX_JOB_SUBMISSION);
-
       synchronized (this.cancellationRequest) {
         if (!this.cancellationRequested) {
           submitJob(workUnits);
-          jobSubmissionTimer.stop();
           log.info(String.format("Submitted job %s", 
this.jobContext.getJobId()));
           this.jobSubmitted = true;
         } else {
@@ -160,9 +160,7 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
         }
       }
 
-      TimingEvent jobRunTimer = 
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.HELIX_JOB_RUN);
       waitJob();
-      jobRunTimer.stop();
       log.info(String.format("Job %s completed", this.jobContext.getJobId()));
     } finally {
       // The last iteration of output TaskState collecting will run when the 
collector service gets stopped
@@ -206,6 +204,8 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
         cancelJob(jobListener);
       }
     } finally {
+      handleLaunchFinalization();
+
       if (isLaunched) {
         if (this.runningMap.replace(this.jobContext.getJobName(), true, 
false)) {
           log.info("Job {} is done, remove from running map.", 
this.jobContext.getJobId());
@@ -218,6 +218,9 @@ public abstract class GobblinJobLauncher extends 
AbstractJobLauncher {
     }
   }
 
+  protected void handleLaunchFinalization() {
+  }
+
   /**
    * This method looks silly at first glance but exists for a reason.
    *
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
index f7d7255ed..914b3aa90 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
@@ -25,12 +25,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 
+import com.google.common.eventbus.EventBus;
+
 import io.temporal.client.WorkflowClient;
 import io.temporal.serviceclient.WorkflowServiceStubs;
 import io.temporal.workflow.Workflow;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.source.workunit.WorkUnit;
@@ -64,9 +67,9 @@ public abstract class GobblinTemporalJobLauncher extends 
GobblinJobLauncher {
   protected String queueName;
 
   public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir,
-                                    List<? extends Tag<?>> metadataTags, 
ConcurrentHashMap<String, Boolean> runningMap)
+                                    List<? extends Tag<?>> metadataTags, 
ConcurrentHashMap<String, Boolean> runningMap, EventBus eventBus)
           throws Exception {
-    super(jobProps, appWorkDir, metadataTags, runningMap);
+    super(jobProps, appWorkDir, metadataTags, runningMap, eventBus);
     log.debug("GobblinTemporalJobLauncher: jobProps {}, appWorkDir {}", 
jobProps, appWorkDir);
 
     String connectionUri = jobProps.getProperty(TEMPORAL_CONNECTION_STRING);
@@ -80,6 +83,16 @@ public abstract class GobblinTemporalJobLauncher extends 
GobblinJobLauncher {
     startCancellationExecutor();
   }
 
+  @Override
+  protected void handleLaunchFinalization() {
+    // NOTE: This code only makes sense when there is 1 source / workflow 
being launched per application for Temporal. This is a stop-gap
+    // for achieving batch job behavior. Given the current constraints of yarn 
applications requiring a static proxy user
+    // during application creation, it is not possible to have multiple 
workflows running in the same application.
+    // and so it makes sense to just kill the job after this is completed
+    log.info("Requesting the AM to shutdown after the job {} completed", 
this.jobContext.getJobId());
+    eventBus.post(new ClusterManagerShutdownRequest());
+  }
+
   /**
    * Submit a job to run.
    */
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
index d93e2a670..76629aa68 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
@@ -184,7 +184,8 @@ public class GobblinTemporalJobScheduler extends 
JobScheduler implements Standar
     return GobblinConstructorUtils.invokeLongestConstructor(jobLauncherClass, 
combinedProps,
             this.appWorkDir,
             this.metadataTags,
-            this.jobRunningMap);
+            this.jobRunningMap,
+            this.eventBus);
   }
 
   @Subscribe
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
index 62a7e4cfe..b1f103c7b 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
@@ -21,10 +21,14 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
-import lombok.extern.slf4j.Slf4j;
-import io.temporal.client.WorkflowOptions;
+
 import org.apache.hadoop.fs.Path;
 
+import com.google.common.eventbus.EventBus;
+
+import io.temporal.client.WorkflowOptions;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.JobLauncher;
@@ -62,9 +66,10 @@ public class GenArbitraryLoadJobLauncher extends 
GobblinTemporalJobLauncher {
       Properties jobProps,
       Path appWorkDir,
       List<? extends Tag<?>> metadataTags,
-      ConcurrentHashMap<String, Boolean> runningMap
+      ConcurrentHashMap<String, Boolean> runningMap,
+      EventBus eventBus
   ) throws Exception {
-    super(jobProps, appWorkDir, metadataTags, runningMap);
+    super(jobProps, appWorkDir, metadataTags, runningMap, eventBus);
   }
 
   @Override
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
index 5d196fae4..737bd461f 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
@@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.fs.Path;
 
+import com.google.common.eventbus.EventBus;
+
 import io.temporal.client.WorkflowOptions;
 import lombok.extern.slf4j.Slf4j;
 
@@ -48,9 +50,9 @@ import 
org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 @Slf4j
 public class HelloWorldJobLauncher extends GobblinTemporalJobLauncher {
   public HelloWorldJobLauncher(Properties jobProps, Path appWorkDir, List<? 
extends Tag<?>> metadataTags,
-      ConcurrentHashMap<String, Boolean> runningMap)
+      ConcurrentHashMap<String, Boolean> runningMap, EventBus eventBus)
       throws Exception {
-    super(jobProps, appWorkDir, metadataTags, runningMap);
+    super(jobProps, appWorkDir, metadataTags, runningMap, eventBus);
   }
 
   @Override
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
index 0f68f1132..a22af9d51 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -70,7 +70,9 @@ public class TemporalEventTimer implements EventTimer {
   }
 
   public static class Factory {
-    private static final ActivityOptions DEFAULT_OPTS = 
ActivityOptions.newBuilder().build();
+    private static final ActivityOptions DEFAULT_OPTS = 
ActivityOptions.newBuilder()
+        .setStartToCloseTimeout(Duration.ofHours(12)) // maximum timeout for 
the actual event submission to kafka, waiting out a kafka outage
+        .build();
     private final SubmitGTEActivity submitGTEActivity;
     private final EventSubmitterContext eventSubmitterContext;
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
index ab6954508..b124063bc 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
@@ -92,7 +92,6 @@ import 
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.cluster.GobblinClusterMetricTagNames;
 import org.apache.gobblin.cluster.GobblinClusterUtils;
 import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.MetricReporterException;
 import org.apache.gobblin.metrics.MultiReporterException;
@@ -207,11 +206,12 @@ class YarnService extends AbstractIdleService {
 
     this.eventBus = eventBus;
 
-    this.gobblinMetrics = 
config.getBoolean(ConfigurationKeys.METRICS_ENABLED_KEY) ?
-        Optional.of(buildGobblinMetrics()) : Optional.<GobblinMetrics>absent();
-
-    this.eventSubmitter = 
config.getBoolean(ConfigurationKeys.METRICS_ENABLED_KEY) ?
-        Optional.of(buildEventSubmitter()) : Optional.<EventSubmitter>absent();
+    // Gobblin metrics have been disabled to allow testing kafka integration 
without having to setup
+    // the metrics reporting topic. For this Temporal based impl of the 
Cluster Manager,
+    // Kafka will only be used for GobblinTrackingEvents for external 
monitoring. This choice was made because metrics
+    // emitted MetricReport are non-critical and not needed for the cluster / 
job execution correctness
+    this.gobblinMetrics = Optional.<GobblinMetrics>absent();
+    this.eventSubmitter = Optional.<EventSubmitter>absent();
 
     this.yarnConfiguration = yarnConfiguration;
     this.fs = fs;
@@ -685,9 +685,11 @@ class YarnService extends AbstractIdleService {
       LOGGER.info("Adding instance {} to the pool of unused instances", 
completedInstanceName);
       this.unusedHelixInstanceNames.add(completedInstanceName);
 
-      // NOTE: logic for handling container failure is removed because 
original implementation relies on the auto scaling manager
-      // to control the number of containers by polling helix for the current 
number of tasks
-      // Without that integration, that code requests too many containers when 
there are exceptions and overloads yarn
+      /**
+       * NOTE: logic for handling container failure is removed because {@link 
#YarnService} relies on the auto scaling manager
+       * to control the number of containers by polling helix for the current 
number of tasks
+       * Without that integration, that code requests too many containers when 
there are exceptions and overloads yarn
+       */
     }
   }
 

Reply via email to