This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 64bdc7007 [GOBBLIN-1996] Add ability for Yarn app to terminate on
finishing of temporal flow (#3865)
64bdc7007 is described below
commit 64bdc7007f16f8b87cfe5aa1b2e263e8c8573bf1
Author: Matthew Ho <[email protected]>
AuthorDate: Tue Feb 13 11:48:15 2024 -0800
[GOBBLIN-1996] Add ability for Yarn app to terminate on finishing of
temporal flow (#3865)
Terminate on finish of temporal flow
---
.../runtime/app/ServiceBasedAppLauncher.java | 27 +++++-
.../cluster/GobblinTemporalClusterManager.java | 39 ++-------
.../ServiceBasedAppLauncherWithoutMetrics.java | 39 +++++++++
.../temporal/ddm/NoWorkUnitsInfiniteSource.java | 97 ++++++++++++++++++++++
.../ddm/launcher/ProcessWorkUnitsJobLauncher.java | 6 +-
.../temporal/joblauncher/GobblinJobLauncher.java | 33 ++++----
.../joblauncher/GobblinTemporalJobLauncher.java | 17 +++-
.../joblauncher/GobblinTemporalJobScheduler.java | 3 +-
.../launcher/GenArbitraryLoadJobLauncher.java | 13 ++-
.../helloworld/HelloWorldJobLauncher.java | 6 +-
.../workflows/metrics/TemporalEventTimer.java | 4 +-
.../apache/gobblin/temporal/yarn/YarnService.java | 20 +++--
12 files changed, 236 insertions(+), 68 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/app/ServiceBasedAppLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/app/ServiceBasedAppLauncher.java
index 06f8113c0..b31c00657 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/app/ServiceBasedAppLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/app/ServiceBasedAppLauncher.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -44,6 +46,7 @@ import
org.apache.gobblin.runtime.services.JMXReportingService;
import org.apache.gobblin.runtime.services.MetricsReportingService;
import org.apache.gobblin.util.ApplicationLauncherUtils;
import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.PropertiesUtils;
/**
@@ -76,6 +79,13 @@ public class ServiceBasedAppLauncher implements
ApplicationLauncher {
*/
public static final String APP_NAME = "app.name";
+ /**
+ * The number of seconds before timing out when waiting for all the services
that were added to launcher to the start. The default value is {@link
#DEFAULT_STARTUP_TIMEOUT}
+ * NOTE: A timeout does not cause the cluster manager to kill the
application, it only logs a warning and proceeds.
+ */
+ public static final String STARTUP_TIMEOUT_SECONDS =
"app.start.waitForServicesTimeout.seconds";
+ public static final Duration DEFAULT_STARTUP_TIMEOUT =
ChronoUnit.FOREVER.getDuration();
+
/**
* The number of seconds to wait for the application to stop, the default
value is {@link #DEFAULT_APP_STOP_TIME_SECONDS}
*/
@@ -91,6 +101,7 @@ public class ServiceBasedAppLauncher implements
ApplicationLauncher {
private static final Logger LOG =
LoggerFactory.getLogger(ServiceBasedAppLauncher.class);
private final int stopTime;
+ private final Duration startupTimeout;
private final String appId;
private final List<Service> services;
@@ -101,6 +112,9 @@ public class ServiceBasedAppLauncher implements
ApplicationLauncher {
public ServiceBasedAppLauncher(Properties properties, String appName) throws
Exception {
this.stopTime =
Integer.parseInt(properties.getProperty(APP_STOP_TIME_SECONDS,
DEFAULT_APP_STOP_TIME_SECONDS));
+ this.startupTimeout = properties.containsKey(STARTUP_TIMEOUT_SECONDS)
+ ? Duration.ofSeconds(PropertiesUtils.getPropAsInt(properties,
STARTUP_TIMEOUT_SECONDS, 0))
+ : DEFAULT_STARTUP_TIMEOUT;
this.appId = ApplicationLauncherUtils.newAppId(appName);
this.services = new ArrayList<>();
@@ -163,9 +177,18 @@ public class ServiceBasedAppLauncher implements
ApplicationLauncher {
});
LOG.info("Starting the Gobblin application and all its associated
Services");
+ for (Service service : this.services) {
+ LOG.info("Starting service " + service.getClass().getSimpleName());
+ }
// Start the application
- this.serviceManager.startAsync().awaitHealthy();
+ try {
+
this.serviceManager.startAsync().awaitHealthy(startupTimeout.getSeconds(),
TimeUnit.SECONDS);
+ } catch (TimeoutException te) {
+ LOG.error("Timeout of {} seconds exceeded for starting services in
service manager. Proceeding anyway to unblock shutdown hook",
+ startupTimeout.getSeconds(), te);
+ }
+ LOG.info("Finished starting all services");
}
/**
@@ -250,7 +273,7 @@ public class ServiceBasedAppLauncher implements
ApplicationLauncher {
}
}
- private void addMetricsService(Properties properties) {
+ protected void addMetricsService(Properties properties) {
if (GobblinMetrics.isEnabled(properties)) {
addService(new MetricsReportingService(properties, this.appId));
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
index 49a91a9a7..a460bb420 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
@@ -33,8 +33,6 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
@@ -81,8 +79,6 @@ import
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@Slf4j
public class GobblinTemporalClusterManager implements ApplicationLauncher,
StandardMetricsBridge, LeadershipChangeAwareComponent {
- private static final Logger LOGGER =
LoggerFactory.getLogger(GobblinTemporalClusterManager.class);
-
private StopStatus stopStatus = new StopStatus(false);
protected ServiceBasedAppLauncher applicationLauncher;
@@ -98,14 +94,6 @@ public class GobblinTemporalClusterManager implements
ApplicationLauncher, Stand
protected final String applicationId;
- // thread used to keep process up for an idle controller
- private Thread idleProcessThread;
-
- // set to true to stop the idle process thread
- private volatile boolean stopIdleProcessThread = false;
-
- private final boolean isStandaloneMode;
-
@Getter
private MutableJobCatalog jobCatalog;
@Getter
@@ -129,15 +117,12 @@ public class GobblinTemporalClusterManager implements
ApplicationLauncher, Stand
this.config = GobblinClusterUtils.addDynamicConfig(sysConfig);
this.clusterName = clusterName;
- this.isStandaloneMode = ConfigUtils.getBoolean(this.config,
GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY,
- GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE);
-
this.applicationId = applicationId;
this.fs = GobblinClusterUtils.buildFileSystem(this.config, new
Configuration());
this.appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get()
: GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config,
this.fs, clusterName, applicationId);
- LOGGER.info("Configured GobblinTemporalClusterManager work dir to: {}",
this.appWorkDir);
+ log.info("Configured GobblinTemporalClusterManager work dir to: {}",
this.appWorkDir);
initializeAppLauncherAndServices();
}
@@ -152,7 +137,7 @@ public class GobblinTemporalClusterManager implements
ApplicationLauncher, Stand
if (!properties.contains(ServiceBasedAppLauncher.APP_STOP_TIME_SECONDS)) {
properties.setProperty(ServiceBasedAppLauncher.APP_STOP_TIME_SECONDS,
Long.toString(300));
}
- this.applicationLauncher = new ServiceBasedAppLauncher(properties,
this.clusterName);
+ this.applicationLauncher = new
ServiceBasedAppLauncherWithoutMetrics(properties, this.clusterName);
// create a job catalog for keeping track of received jobs if a job config
path is specified
if
(this.config.hasPath(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX
@@ -201,14 +186,17 @@ public class GobblinTemporalClusterManager implements
ApplicationLauncher, Stand
*/
private void stopAppLauncherAndServices() {
try {
+ log.info("Stopping the Gobblin cluster application launcher");
this.applicationLauncher.stop();
} catch (ApplicationException ae) {
- LOGGER.error("Error while stopping Gobblin Cluster application
launcher", ae);
+ log.error("Error while stopping Gobblin Cluster application launcher",
ae);
}
+ log.info("Stopping the Gobblin cluster job catalog");
if (this.jobCatalog instanceof Service) {
((Service) this.jobCatalog).stopAsync().awaitTerminated();
}
+ log.info("Stopped the Gobblin cluster job catalog");
}
@@ -218,7 +206,7 @@ public class GobblinTemporalClusterManager implements
ApplicationLauncher, Stand
@Override
public void start() {
// temporal workflow
- LOGGER.info("Starting the Gobblin Temporal Cluster Manager");
+ log.info("Starting the Gobblin Temporal Cluster Manager");
this.eventBus.register(this);
startAppLauncherAndServices();
@@ -236,18 +224,9 @@ public class GobblinTemporalClusterManager implements
ApplicationLauncher, Stand
this.stopStatus.setStopInprogress(true);
- LOGGER.info("Stopping the Gobblin Cluster Manager");
-
- if (this.idleProcessThread != null) {
- try {
- this.idleProcessThread.join();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- }
+ log.info("Stopping the Gobblin Cluster Manager");
stopAppLauncherAndServices();
-
}
private GobblinTemporalJobScheduler buildGobblinTemporalJobScheduler(Config
sysConfig, Path appWorkDir,
@@ -338,7 +317,7 @@ public class GobblinTemporalClusterManager implements
ApplicationLauncher, Stand
isStandaloneClusterManager =
Boolean.parseBoolean(cmd.getOptionValue(GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE,
"false"));
}
- LOGGER.info(JvmUtils.getJvmInputArguments());
+ log.info(JvmUtils.getJvmInputArguments());
Config config = ConfigFactory.load();
if
(cmd.hasOption(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME))
{
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/ServiceBasedAppLauncherWithoutMetrics.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/ServiceBasedAppLauncherWithoutMetrics.java
new file mode 100644
index 000000000..89e296080
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/ServiceBasedAppLauncherWithoutMetrics.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.cluster;
+
+import java.util.Properties;
+
+import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
+
+
+/**
+ * {@link ServiceBasedAppLauncher} that does not add metrics service. This is
different from deactivating the flag
+ * {@link
org.apache.gobblin.configuration.ConfigurationKeys#METRICS_ENABLED_KEY} because
this will only not initialize
+ * the metrics service in the cluster manager but still allow for {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}
+ */
+public class ServiceBasedAppLauncherWithoutMetrics extends
ServiceBasedAppLauncher {
+ public ServiceBasedAppLauncherWithoutMetrics(Properties properties, String
appName)
+ throws Exception {
+ super(properties, appName);
+ }
+
+ @Override
+ protected void addMetricsService(Properties properties) {
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/NoWorkUnitsInfiniteSource.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/NoWorkUnitsInfiniteSource.java
new file mode 100644
index 000000000..4f2b393a3
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/NoWorkUnitsInfiniteSource.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.eventbus.EventBus;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.instrumented.extractor.InstrumentedExtractor;
+import org.apache.gobblin.source.InfiniteSource;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.stream.RecordEnvelope;
+
+
+/**
+ * An implementation of {@link InfiniteSource} that does not generate any
workunits. This is helpful when the {@link io.temporal.workflow.Workflow}
+ * is driven by the job launcher and not the source. I.e. we want the
discovery to be triggered on a {@link io.temporal.worker.Worker} and not the
+ * {@link org.apache.gobblin.temporal.cluster.GobblinTemporalClusterManager}
+ *
+ * This class also implements the {@link InfiniteSource} to provide hooks for
communicating with the
+ * {@link org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher}
via the {@link EventBus}.
+ */
+@Slf4j
+public class NoWorkUnitsInfiniteSource implements InfiniteSource {
+
+ private final EventBus eventBus = new
EventBus(this.getClass().getSimpleName());
+
+ @Override
+ public List<WorkUnit> getWorkunits(SourceState state) {
+ return Arrays.asList(WorkUnit.createEmpty());
+ }
+
+ @Override
+ public Extractor getExtractor(WorkUnitState state)
+ throws IOException {
+ // no-op stub extractor
+ return new InstrumentedExtractor(state) {
+ @Override
+ public Object getSchema()
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public long getExpectedRecordCount() {
+ return 0;
+ }
+
+ @Override
+ public long getHighWatermark() {
+ return 0;
+ }
+
+ @Override
+ protected RecordEnvelope readRecordEnvelopeImpl() throws
DataRecordException, IOException {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public void shutdown(SourceState state) {
+ }
+
+ @Override
+ public boolean isEarlyStopped() {
+ return InfiniteSource.super.isEarlyStopped();
+ }
+
+ @Override
+ public EventBus getEventBus() {
+ return this.eventBus;
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index f7040f79f..ee6fe7f80 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.fs.Path;
+import com.google.common.eventbus.EventBus;
import com.typesafe.config.ConfigFactory;
import io.temporal.client.WorkflowOptions;
@@ -69,9 +70,10 @@ public class ProcessWorkUnitsJobLauncher extends
GobblinTemporalJobLauncher {
Properties jobProps,
Path appWorkDir,
List<? extends Tag<?>> metadataTags,
- ConcurrentHashMap<String, Boolean> runningMap
+ ConcurrentHashMap<String, Boolean> runningMap,
+ EventBus eventBus
) throws Exception {
- super(jobProps, appWorkDir, metadataTags, runningMap);
+ super(jobProps, appWorkDir, metadataTags, runningMap, eventBus);
}
@Override
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
index 9ff8b6bea..c61d8c72a 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
@@ -17,14 +17,21 @@
package org.apache.gobblin.temporal.joblauncher;
-import com.google.common.annotations.VisibleForTesting;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -37,7 +44,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.CountEventBuilder;
import org.apache.gobblin.metrics.event.JobEvent;
-import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.rest.LauncherTypeEnum;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.JobException;
@@ -50,10 +56,6 @@ import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ParallelRunner;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
/**
* An implementation of {@link JobLauncher} that launches a Gobblin job using
the Temporal task framework.
* Most of this code is lifted from {@link
org.apache.gobblin.cluster.GobblinHelixJobLauncher} and maybe in the future
@@ -69,6 +71,7 @@ import org.apache.hadoop.fs.Path;
@Alpha
@Slf4j
public abstract class GobblinJobLauncher extends AbstractJobLauncher {
+ protected final EventBus eventBus;
protected static final String WORK_UNIT_FILE_EXTENSION = ".wu";
protected final FileSystem fs;
protected final Path appWorkDir;
@@ -87,7 +90,7 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
public GobblinJobLauncher(Properties jobProps, Path appWorkDir,
- List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean>
runningMap)
+ List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean>
runningMap, EventBus eventbus)
throws Exception {
super(jobProps, HelixUtils.initBaseEventTags(jobProps, metadataTags));
log.debug("GobblinJobLauncher: jobProps {}, appWorkDir {}", jobProps,
appWorkDir);
@@ -101,6 +104,7 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
this.stateSerDeRunnerThreads =
Integer.parseInt(jobProps.getProperty(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY,
Integer.toString(ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS)));
+ this.eventBus = eventbus;
Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps)
.withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY,
ConfigValueFactory.fromAnyRef(
@@ -146,13 +150,9 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
// Start the output TaskState collector service
this.taskStateCollectorService.startAsync().awaitRunning();
- TimingEvent jobSubmissionTimer =
-
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.HELIX_JOB_SUBMISSION);
-
synchronized (this.cancellationRequest) {
if (!this.cancellationRequested) {
submitJob(workUnits);
- jobSubmissionTimer.stop();
log.info(String.format("Submitted job %s",
this.jobContext.getJobId()));
this.jobSubmitted = true;
} else {
@@ -160,9 +160,7 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
}
}
- TimingEvent jobRunTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.HELIX_JOB_RUN);
waitJob();
- jobRunTimer.stop();
log.info(String.format("Job %s completed", this.jobContext.getJobId()));
} finally {
// The last iteration of output TaskState collecting will run when the
collector service gets stopped
@@ -206,6 +204,8 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
cancelJob(jobListener);
}
} finally {
+ handleLaunchFinalization();
+
if (isLaunched) {
if (this.runningMap.replace(this.jobContext.getJobName(), true,
false)) {
log.info("Job {} is done, remove from running map.",
this.jobContext.getJobId());
@@ -218,6 +218,9 @@ public abstract class GobblinJobLauncher extends
AbstractJobLauncher {
}
}
+ protected void handleLaunchFinalization() {
+ }
+
/**
* This method looks silly at first glance but exists for a reason.
*
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
index f7d7255ed..914b3aa90 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java
@@ -25,12 +25,15 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
+import com.google.common.eventbus.EventBus;
+
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.workflow.Workflow;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.source.workunit.WorkUnit;
@@ -64,9 +67,9 @@ public abstract class GobblinTemporalJobLauncher extends
GobblinJobLauncher {
protected String queueName;
public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir,
- List<? extends Tag<?>> metadataTags,
ConcurrentHashMap<String, Boolean> runningMap)
+ List<? extends Tag<?>> metadataTags,
ConcurrentHashMap<String, Boolean> runningMap, EventBus eventBus)
throws Exception {
- super(jobProps, appWorkDir, metadataTags, runningMap);
+ super(jobProps, appWorkDir, metadataTags, runningMap, eventBus);
log.debug("GobblinTemporalJobLauncher: jobProps {}, appWorkDir {}",
jobProps, appWorkDir);
String connectionUri = jobProps.getProperty(TEMPORAL_CONNECTION_STRING);
@@ -80,6 +83,16 @@ public abstract class GobblinTemporalJobLauncher extends
GobblinJobLauncher {
startCancellationExecutor();
}
+ @Override
+ protected void handleLaunchFinalization() {
+ // NOTE: This code only makes sense when there is 1 source / workflow
being launched per application for Temporal. This is a stop-gap
+ // for achieving batch job behavior. Given the current constraints of yarn
applications requiring a static proxy user
+ // during application creation, it is not possible to have multiple
workflows running in the same application.
+ // and so it makes sense to just kill the job after this is completed
+ log.info("Requesting the AM to shutdown after the job {} completed",
this.jobContext.getJobId());
+ eventBus.post(new ClusterManagerShutdownRequest());
+ }
+
/**
* Submit a job to run.
*/
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
index d93e2a670..76629aa68 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java
@@ -184,7 +184,8 @@ public class GobblinTemporalJobScheduler extends
JobScheduler implements Standar
return GobblinConstructorUtils.invokeLongestConstructor(jobLauncherClass,
combinedProps,
this.appWorkDir,
this.metadataTags,
- this.jobRunningMap);
+ this.jobRunningMap,
+ this.eventBus);
}
@Subscribe
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
index 62a7e4cfe..b1f103c7b 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/launcher/GenArbitraryLoadJobLauncher.java
@@ -21,10 +21,14 @@ import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import lombok.extern.slf4j.Slf4j;
-import io.temporal.client.WorkflowOptions;
+
import org.apache.hadoop.fs.Path;
+import com.google.common.eventbus.EventBus;
+
+import io.temporal.client.WorkflowOptions;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobLauncher;
@@ -62,9 +66,10 @@ public class GenArbitraryLoadJobLauncher extends
GobblinTemporalJobLauncher {
Properties jobProps,
Path appWorkDir,
List<? extends Tag<?>> metadataTags,
- ConcurrentHashMap<String, Boolean> runningMap
+ ConcurrentHashMap<String, Boolean> runningMap,
+ EventBus eventBus
) throws Exception {
- super(jobProps, appWorkDir, metadataTags, runningMap);
+ super(jobProps, appWorkDir, metadataTags, runningMap, eventBus);
}
@Override
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
index 5d196fae4..737bd461f 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/HelloWorldJobLauncher.java
@@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.fs.Path;
+import com.google.common.eventbus.EventBus;
+
import io.temporal.client.WorkflowOptions;
import lombok.extern.slf4j.Slf4j;
@@ -48,9 +50,9 @@ import
org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
@Slf4j
public class HelloWorldJobLauncher extends GobblinTemporalJobLauncher {
public HelloWorldJobLauncher(Properties jobProps, Path appWorkDir, List<?
extends Tag<?>> metadataTags,
- ConcurrentHashMap<String, Boolean> runningMap)
+ ConcurrentHashMap<String, Boolean> runningMap, EventBus eventBus)
throws Exception {
- super(jobProps, appWorkDir, metadataTags, runningMap);
+ super(jobProps, appWorkDir, metadataTags, runningMap, eventBus);
}
@Override
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
index 0f68f1132..a22af9d51 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -70,7 +70,9 @@ public class TemporalEventTimer implements EventTimer {
}
public static class Factory {
- private static final ActivityOptions DEFAULT_OPTS =
ActivityOptions.newBuilder().build();
+ private static final ActivityOptions DEFAULT_OPTS =
ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(Duration.ofHours(12)) // maximum timeout for
the actual event submission to kafka, waiting out a kafka outage
+ .build();
private final SubmitGTEActivity submitGTEActivity;
private final EventSubmitterContext eventSubmitterContext;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
index ab6954508..b124063bc 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
@@ -92,7 +92,6 @@ import
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterMetricTagNames;
import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricReporterException;
import org.apache.gobblin.metrics.MultiReporterException;
@@ -207,11 +206,12 @@ class YarnService extends AbstractIdleService {
this.eventBus = eventBus;
- this.gobblinMetrics =
config.getBoolean(ConfigurationKeys.METRICS_ENABLED_KEY) ?
- Optional.of(buildGobblinMetrics()) : Optional.<GobblinMetrics>absent();
-
- this.eventSubmitter =
config.getBoolean(ConfigurationKeys.METRICS_ENABLED_KEY) ?
- Optional.of(buildEventSubmitter()) : Optional.<EventSubmitter>absent();
+ // Gobblin metrics have been disabled to allow testing kafka integration
without having to setup
+ // the metrics reporting topic. For this Temporal based impl of the
Cluster Manager,
+ // Kafka will only be used for GobblinTrackingEvents for external
monitoring. This choice was made because metrics
+ // emitted MetricReport are non-critical and not needed for the cluster /
job execution correctness
+ this.gobblinMetrics = Optional.<GobblinMetrics>absent();
+ this.eventSubmitter = Optional.<EventSubmitter>absent();
this.yarnConfiguration = yarnConfiguration;
this.fs = fs;
@@ -685,9 +685,11 @@ class YarnService extends AbstractIdleService {
LOGGER.info("Adding instance {} to the pool of unused instances",
completedInstanceName);
this.unusedHelixInstanceNames.add(completedInstanceName);
- // NOTE: logic for handling container failure is removed because
original implementation relies on the auto scaling manager
- // to control the number of containers by polling helix for the current
number of tasks
- // Without that integration, that code requests too many containers when
there are exceptions and overloads yarn
+ /**
+ * NOTE: logic for handling container failure is removed because {@link
#YarnService} relies on the auto scaling manager
+ * to control the number of containers by polling helix for the current
number of tasks
+ * Without that integration, that code requests too many containers when
there are exceptions and overloads yarn
+ */
}
}