[ 
https://issues.apache.org/jira/browse/GOBBLIN-1996?focusedWorklogId=902895&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-902895
 ]

ASF GitHub Bot logged work on GOBBLIN-1996:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Jan/24 20:36
            Start Date: 31/Jan/24 20:36
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3865:
URL: https://github.com/apache/gobblin/pull/3865#discussion_r1473396073


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -146,23 +152,17 @@ protected void runWorkUnits(List<WorkUnit> workUnits) 
throws Exception {
       // Start the output TaskState collector service
       this.taskStateCollectorService.startAsync().awaitRunning();
 
-      TimingEvent jobSubmissionTimer =
-          
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.HELIX_JOB_SUBMISSION);

Review Comment:
   are you removing in conjunction w/ the idea that we don't want noisy metrics 
(that you mentioned)?  if so, are we certain the "metrics disabling" code path 
is actually being taken in the other classes?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/app/ServiceBasedAppLauncher.java:
##########
@@ -163,9 +170,17 @@ public void run() {
     });
 
     LOG.info("Starting the Gobblin application and all its associated 
Services");
+    for (Service service : this.services) {
+      LOG.info("Starting service " + service.getClass().getSimpleName());
+    }
 
     // Start the application
-    this.serviceManager.startAsync().awaitHealthy();
+    try {
+      
this.serviceManager.startAsync().awaitHealthy(startupTimeout.getSeconds(), 
TimeUnit.SECONDS);
+    } catch (TimeoutException te) {
+      LOG.error("Timeout in starting all services in service manager. 
Proceeding anyway to unblock shutdown hook", te);

Review Comment:
   recommend logging TO value (e.g. "never happened in %d secs")



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/ServiceBasedAppLauncherWithoutMetrics.java:
##########
@@ -0,0 +1,22 @@
+package org.apache.gobblin.temporal.cluster;
+
+import java.util.Properties;
+
+import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
+
+
+/**
+ * {@link ServiceBasedAppLauncher} that does not add metrics service. This is 
different from deactivating the flag
+ * {@link 
org.apache.gobblin.configuration.ConfigurationKeys#METRICS_ENABLED_KEY} because 
this will only not initialize
+ * the metrics service in the cluster manager but still allow for {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent}
+ */
+public class ServiceBasedAppLauncherWithoutMetrics extends 
ServiceBasedAppLauncher {
+  public ServiceBasedAppLauncherWithoutMetrics(Properties properties, String 
appName)
+      throws Exception {
+    super(properties, appName);
+  }
+
+  @Override
+  protected void addMetricsService(Properties properties) {

Review Comment:
   alternative (w/o disabling gobblin metrics overall) would be an additional 
config in addition to:
   ```
   if (GobblinMetrics.isEnabled(properties))
   ```
   (overall I prefer to use inheritance when there's not a simpler alt)



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -683,18 +683,10 @@ protected void handleContainerCompletion(ContainerStatus 
containerStatus) {
       LOGGER.info("Adding instance {} to the pool of unused instances", 
completedInstanceName);
       this.unusedHelixInstanceNames.add(completedInstanceName);
 
-      if (this.eventSubmitter.isPresent()) {
-        this.eventSubmitter.get()
-            
.submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, 
eventMetadataBuilder.get().build());
-      }
+      // NOTE: logic for handling container failure is removed because 
original implementation relies on the auto scaling manager

Review Comment:
   suggest replacing "original impl" w/
   > the {@link org.apache.gobblin.yarn.YarnService} implementation on which 
this is originally based



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -206,6 +206,13 @@ public void launchJob(@Nullable JobListener jobListener) 
throws JobException {
         cancelJob(jobListener);
       }
     } finally {
+      // NOTE: This code only makes sense when there is 1 source / workflow 
being launched per application. This is a stop-gap
+      // for recreating batch job behavior in GaaS. Given the current 
constraints of yarn applications requiring a static proxy user
+      // during application creation, it is not possible to have multiple 
workflows running in the same application.
+      // and so it makes sense ti just kill the job after this is complete

Review Comment:
   "to"?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/app/ServiceBasedAppLauncher.java:
##########
@@ -75,6 +78,8 @@ public class ServiceBasedAppLauncher implements 
ApplicationLauncher {
    * The name of the application. Not applicable for YARN jobs, which uses a 
separate key for the application name.
    */
   public static final String APP_NAME = "app.name";
+  public static final String STARTUP_TIMEOUT = 
"app.start.waitForServices.seconds";
+  public static final Duration DEFAULT_STARTUP_TIMEOUT = 
ChronoUnit.FOREVER.getDuration();

Review Comment:
   not clear if this relates to termination (given the context of the PR) or 
startup.
   
   anyway, what's a reasonable recommended value--is "forever" generally what 
we want?  javadoc should clarify such things



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/ProcessWorkunitsSource.java:
##########
@@ -0,0 +1,75 @@
+package org.apache.gobblin.temporal.ddm;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.eventbus.EventBus;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.instrumented.extractor.InstrumentedExtractor;
+import org.apache.gobblin.source.InfiniteSource;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.stream.RecordEnvelope;
+
+
+@Slf4j
+public class ProcessWorkunitsSource implements InfiniteSource {

Review Comment:
   `NoWorkunitsSource` or `EmptyWorkunitsSource`... doesn't seem to be anything 
processing specific (also sources don't ever "process" WUs themself, so may be 
confusing)
   
   also javadoc (and note importance of `InfiniteSource`, which I presume a key 
element)



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -206,6 +206,13 @@ public void launchJob(@Nullable JobListener jobListener) 
throws JobException {
         cancelJob(jobListener);
       }
     } finally {
+      // NOTE: This code only makes sense when there is 1 source / workflow 
being launched per application. This is a stop-gap
+      // for recreating batch job behavior in GaaS. Given the current 
constraints of yarn applications requiring a static proxy user
+      // during application creation, it is not possible to have multiple 
workflows running in the same application.

Review Comment:
   regarding what I just said about inheritance, this might be a situation 
where that is in fact preferable to sticking opinionated behavior into a base 
class.
   
   e.g. no more than a hook here, like `handleLaunchFinalization()`.  then 
create a derived class like `RunAndDoneGobblinJL` or `TerminateAfterLaunchGJL` 
that sends the cluster shutdown request.  then it's up to the specific job 
launchers to either take on this functionality or go w/o it by deciding which 
to extend.



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -206,6 +206,13 @@ public void launchJob(@Nullable JobListener jobListener) 
throws JobException {
         cancelJob(jobListener);
       }
     } finally {
+      // NOTE: This code only makes sense when there is 1 source / workflow 
being launched per application. This is a stop-gap
+      // for recreating batch job behavior in GaaS. Given the current 
constraints of yarn applications requiring a static proxy user

Review Comment:
   nit: "for achieving batch job behavior." ?  (i.e. drop GaaS)



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java:
##########
@@ -206,6 +206,13 @@ public void launchJob(@Nullable JobListener jobListener) 
throws JobException {
         cancelJob(jobListener);
       }
     } finally {
+      // NOTE: This code only makes sense when there is 1 source / workflow 
being launched per application. This is a stop-gap
+      // for recreating batch job behavior in GaaS. Given the current 
constraints of yarn applications requiring a static proxy user
+      // during application creation, it is not possible to have multiple 
workflows running in the same application.
+      // and so it makes sense ti just kill the job after this is complete
+      log.info("Requesting the AM to shutdown after the job {} completed", 
this.jobContext.getJobId());
+      eventBus.post(new ClusterManagerShutdownRequest());

Review Comment:
   any consequence to doing this before checking `isLaunched`?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -205,11 +206,10 @@ public YarnService(Config config, String applicationName, 
String applicationId,
 
     this.eventBus = eventBus;
 
-    this.gobblinMetrics = 
config.getBoolean(ConfigurationKeys.METRICS_ENABLED_KEY) ?
-        Optional.of(buildGobblinMetrics()) : Optional.<GobblinMetrics>absent();
-
-    this.eventSubmitter = 
config.getBoolean(ConfigurationKeys.METRICS_ENABLED_KEY) ?
-        Optional.of(buildEventSubmitter()) : Optional.<EventSubmitter>absent();
+    // Gobblin metrics have been disabled in the PoC temporarily to make 
testing kafka integration without having to worry
+    // how the metrics topics are used. Kafka will only be used for 
GobblinTrackingEvents in the initial POC

Review Comment:
   let's steer away from PoC, which maintainers and those considering reuse may 
not have context on.  instead speak to underlying behavior/semantics (and 
perhaps suggest when desireable)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/app/ServiceBasedAppLauncher.java:
##########
@@ -163,9 +170,17 @@ public void run() {
     });
 
     LOG.info("Starting the Gobblin application and all its associated 
Services");
+    for (Service service : this.services) {
+      LOG.info("Starting service " + service.getClass().getSimpleName());
+    }
 
     // Start the application
-    this.serviceManager.startAsync().awaitHealthy();
+    try {
+      
this.serviceManager.startAsync().awaitHealthy(startupTimeout.getSeconds(), 
TimeUnit.SECONDS);

Review Comment:
   what does it mean for the "start call [to] last indefinitely"?
   
   - the `startAsync` (it's not actually async)?
    
   - or the `awaitHealthy` (e.g. the FSConfigMgr never actually becomes 
healthy)?
   
   also, if `awaitHealthy` never returns does that impede our ability to 
fail-fast when there's a legit problem, like the deployment is DOA?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 902895)
    Time Spent: 40m  (was: 0.5h)

> Add ability for Yarn app to terminate on finishing of temporal flow
> -------------------------------------------------------------------
>
>                 Key: GOBBLIN-1996
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1996
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Matthew Ho
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to