[
https://issues.apache.org/jira/browse/GOBBLIN-1996?focusedWorklogId=902895&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-902895
]
ASF GitHub Bot logged work on GOBBLIN-1996:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 31/Jan/24 20:36
Start Date: 31/Jan/24 20:36
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3865:
URL: https://github.com/apache/gobblin/pull/3865#discussion_r1473396073
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -146,23 +152,17 @@ protected void runWorkUnits(List<WorkUnit> workUnits)
throws Exception {
// Start the output TaskState collector service
this.taskStateCollectorService.startAsync().awaitRunning();
- TimingEvent jobSubmissionTimer =
-
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.HELIX_JOB_SUBMISSION);
Review Comment:
are you removing in conjunction w/ the idea that we don't want noisy metrics
(that you mentioned)? if so, are we certain the "metrics disabling" code path
is actually being taken in the other classes?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/app/ServiceBasedAppLauncher.java:
##########
@@ -163,9 +170,17 @@ public void run() {
});
LOG.info("Starting the Gobblin application and all its associated
Services");
+ for (Service service : this.services) {
+ LOG.info("Starting service " + service.getClass().getSimpleName());
+ }
// Start the application
- this.serviceManager.startAsync().awaitHealthy();
+ try {
+
this.serviceManager.startAsync().awaitHealthy(startupTimeout.getSeconds(),
TimeUnit.SECONDS);
+ } catch (TimeoutException te) {
+ LOG.error("Timeout in starting all services in service manager.
Proceeding anyway to unblock shutdown hook", te);
Review Comment:
recommend logging TO value (e.g. "never happened in %d secs")
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/ServiceBasedAppLauncherWithoutMetrics.java:
##########
@@ -0,0 +1,22 @@
+package org.apache.gobblin.temporal.cluster;
+
+import java.util.Properties;
+
+import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
+
+
+/**
+ * {@link ServiceBasedAppLauncher} that does not add metrics service. This is
different from deactivating the flag
+ * {@link
org.apache.gobblin.configuration.ConfigurationKeys#METRICS_ENABLED_KEY} because
this will only not initialize
+ * the metrics service in the cluster manager but still allow for {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}
+ */
+public class ServiceBasedAppLauncherWithoutMetrics extends
ServiceBasedAppLauncher {
+ public ServiceBasedAppLauncherWithoutMetrics(Properties properties, String
appName)
+ throws Exception {
+ super(properties, appName);
+ }
+
+ @Override
+ protected void addMetricsService(Properties properties) {
Review Comment:
alternative (w/o disabling gobblin metrics overall) would be an additional
config in addition to:
```
if (GobblinMetrics.isEnabled(properties))
```
(overall I prefer to use inheritance when there's not a simpler alt)
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -683,18 +683,10 @@ protected void handleContainerCompletion(ContainerStatus
containerStatus) {
LOGGER.info("Adding instance {} to the pool of unused instances",
completedInstanceName);
this.unusedHelixInstanceNames.add(completedInstanceName);
- if (this.eventSubmitter.isPresent()) {
- this.eventSubmitter.get()
-
.submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION,
eventMetadataBuilder.get().build());
- }
+ // NOTE: logic for handling container failure is removed because
original implementation relies on the auto scaling manager
Review Comment:
suggest replacing "original impl" w/
> the {@link org.apache.gobblin.yarn.YarnService} implementation on which
this is originally based
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -206,6 +206,13 @@ public void launchJob(@Nullable JobListener jobListener)
throws JobException {
cancelJob(jobListener);
}
} finally {
+ // NOTE: This code only makes sense when there is 1 source / workflow
being launched per application. This is a stop-gap
+ // for recreating batch job behavior in GaaS. Given the current
constraints of yarn applications requiring a static proxy user
+ // during application creation, it is not possible to have multiple
workflows running in the same application.
+ // and so it makes sense ti just kill the job after this is complete
Review Comment:
"to"?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/app/ServiceBasedAppLauncher.java:
##########
@@ -75,6 +78,8 @@ public class ServiceBasedAppLauncher implements
ApplicationLauncher {
* The name of the application. Not applicable for YARN jobs, which uses a
separate key for the application name.
*/
public static final String APP_NAME = "app.name";
+ public static final String STARTUP_TIMEOUT =
"app.start.waitForServices.seconds";
+ public static final Duration DEFAULT_STARTUP_TIMEOUT =
ChronoUnit.FOREVER.getDuration();
Review Comment:
not clear if this relates to termination (given the context of the PR) or
startup.
anyway, what's a reasonable recommended value--is "forever" generally what
we want? javadoc should clarify such things
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/ProcessWorkunitsSource.java:
##########
@@ -0,0 +1,75 @@
+package org.apache.gobblin.temporal.ddm;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.eventbus.EventBus;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.instrumented.extractor.InstrumentedExtractor;
+import org.apache.gobblin.source.InfiniteSource;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.stream.RecordEnvelope;
+
+
+@Slf4j
+public class ProcessWorkunitsSource implements InfiniteSource {
Review Comment:
`NoWorkunitsSource` or `EmptyWorkunitsSource`... doesn't seem to be anything
processing specific (also sources don't ever "process" WUs themself, so may be
confusing)
also javadoc (and note importance of `InfiniteSource`, which I presume a key
element)
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -206,6 +206,13 @@ public void launchJob(@Nullable JobListener jobListener)
throws JobException {
cancelJob(jobListener);
}
} finally {
+ // NOTE: This code only makes sense when there is 1 source / workflow
being launched per application. This is a stop-gap
+ // for recreating batch job behavior in GaaS. Given the current
constraints of yarn applications requiring a static proxy user
+ // during application creation, it is not possible to have multiple
workflows running in the same application.
Review Comment:
regarding what I just said about inheritance, this might be a situation
where that is in fact preferable to sticking opinionated behavior into a base
class.
e.g. no more than a hook here, like `handleLaunchFinalization()`. then
create a derived class like `RunAndDoneGobblinJL` or `TerminateAfterLaunchGJL`
that sends the cluster shutdown request. then it's up to the specific job
launchers to either take on this functionality or go w/o it by deciding which
to extend.
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -206,6 +206,13 @@ public void launchJob(@Nullable JobListener jobListener)
throws JobException {
cancelJob(jobListener);
}
} finally {
+ // NOTE: This code only makes sense when there is 1 source / workflow
being launched per application. This is a stop-gap
+ // for recreating batch job behavior in GaaS. Given the current
constraints of yarn applications requiring a static proxy user
Review Comment:
nit: "for achieving batch job behavior." ? (i.e. drop GaaS)
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -206,6 +206,13 @@ public void launchJob(@Nullable JobListener jobListener)
throws JobException {
cancelJob(jobListener);
}
} finally {
+ // NOTE: This code only makes sense when there is 1 source / workflow
being launched per application. This is a stop-gap
+ // for recreating batch job behavior in GaaS. Given the current
constraints of yarn applications requiring a static proxy user
+ // during application creation, it is not possible to have multiple
workflows running in the same application.
+ // and so it makes sense ti just kill the job after this is complete
+ log.info("Requesting the AM to shutdown after the job {} completed",
this.jobContext.getJobId());
+ eventBus.post(new ClusterManagerShutdownRequest());
Review Comment:
any consequence to doing this before checking `isLaunched`?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -205,11 +206,10 @@ public YarnService(Config config, String applicationName,
String applicationId,
this.eventBus = eventBus;
- this.gobblinMetrics =
config.getBoolean(ConfigurationKeys.METRICS_ENABLED_KEY) ?
- Optional.of(buildGobblinMetrics()) : Optional.<GobblinMetrics>absent();
-
- this.eventSubmitter =
config.getBoolean(ConfigurationKeys.METRICS_ENABLED_KEY) ?
- Optional.of(buildEventSubmitter()) : Optional.<EventSubmitter>absent();
+ // Gobblin metrics have been disabled in the PoC temporarily to make
testing kafka integration without having to worry
+ // how the metrics topics are used. Kafka will only be used for
GobblinTrackingEvents in the initial POC
Review Comment:
let's steer away from PoC, which maintainers and those considering reuse may
not have context on. instead speak to underlying behavior/semantics (and
perhaps suggest when desireable)
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/app/ServiceBasedAppLauncher.java:
##########
@@ -163,9 +170,17 @@ public void run() {
});
LOG.info("Starting the Gobblin application and all its associated
Services");
+ for (Service service : this.services) {
+ LOG.info("Starting service " + service.getClass().getSimpleName());
+ }
// Start the application
- this.serviceManager.startAsync().awaitHealthy();
+ try {
+
this.serviceManager.startAsync().awaitHealthy(startupTimeout.getSeconds(),
TimeUnit.SECONDS);
Review Comment:
what does it mean for the "start call [to] last indefinitely"?
- the `startAsync` (it's not actually async)?
- or the `awaitHealthy` (e.g. the FSConfigMgr never actually becomes
healthy)?
also, if `awaitHealthy` never returns does that impede our ability to
fail-fast when there's a legit problem, like the deployment is DOA?
Issue Time Tracking
-------------------
Worklog Id: (was: 902895)
Time Spent: 40m (was: 0.5h)
> Add ability for Yarn app to terminate on finishing of temporal flow
> -------------------------------------------------------------------
>
> Key: GOBBLIN-1996
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1996
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Matthew Ho
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)