umustafi commented on code in PR #4012:
URL: https://github.com/apache/gobblin/pull/4012#discussion_r1700753986


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java:
##########
@@ -33,7 +33,7 @@
  * See javadoc for {@link DagAction}
  */
 public interface DagActionStore {
-  public static final String NO_JOB_NAME_DEFAULT = "";
+  String NO_JOB_NAME_DEFAULT = "";

Review Comment:
   why not static final?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java:
##########
@@ -120,6 +120,24 @@ && 
persistLaunchDagAction((LeaseAttemptStatus.LeaseObtainedStatus) leaseAttempt)
     }
   }
 
+  /**
+   * This method is used in the multi-active scheduler case for one or more 
hosts to respond to a kill dag action
+   * event triggered by the Orchestrator by attempting a lease for the kill 
event and processing the result depending on
+   * the status of the attempt.
+   */
+  public void handleFlowKillTriggerEvent(Properties jobProps, 
DagActionStore.LeaseParams leaseParams) throws IOException {

Review Comment:
   do we need this for resume as well? also this has duplicate code with launch 
so we can probably extract them out and have a function with the common logic 
that both `handleFlowLaunchTriggerEvent` & `handleFlowKillTriggerEvent` utilize



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -140,6 +153,11 @@ public void run() {
           dagTask.conclude();
         } catch (Exception e) {
           log.error("DagProcEngineThread encountered exception while 
processing dag " + dagProc.getDagId(), e);
+          if (KafkaJobStatusMonitor.isThrowableInstanceOf(e, 
this.nonRetryableExceptions)) {
+            // conclude the lease so that it is not retried, if the dag proc 
fails with non-transient exception
+            dagTask.conclude();
+            
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
+          }
           
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();

Review Comment:
   this should be in else block otherwise we are double counting



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java:
##########
@@ -103,6 +104,8 @@ public void activate() {
           ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR));
       dagProcessingExceptionMeter = 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
           ServiceMetricNames.DAG_PROCESSING_EXCEPTION_METER));
+      dagProcessingNonRetryableExceptionMeter = 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          ServiceMetricNames.DAG_PROCESSING_NON_RETRYABLE_EXCEPTION_METER));

Review Comment:
   should we move both to DagProcessingEngineMetrics or do want to do in a 
later PR where the two are merged?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -319,14 +322,30 @@ public void submitFlowToDagManager(FlowSpec flowSpec, 
Dag<JobExecutionPlan> jobE
   }
 
   public void remove(Spec spec, Properties headers) throws IOException {
+    URI uri = spec.getUri();
     // TODO: Evolve logic to cache and reuse previously compiled JobSpecs
     // .. this will work for Identity compiler but not always for multi-hop.
     // Note: Current logic assumes compilation is consistent between all 
executions
     if (spec instanceof FlowSpec) {
-      //Send the dag to the DagManager to stop it.
-      //Also send it to the SpecProducer to do any cleanup tasks on 
SpecExecutor.
-      _log.info("Forwarding cancel request for flow URI {} to DagManager.", 
spec.getUri());
-      this.dagManager.stopDag(spec.getUri());
+      String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
+      String flowName = FlowSpec.Utils.getFlowName(uri);
+      if (this.flowLaunchHandler.isPresent()) {
+        List<Long> flowExecutionIds = 
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+        _log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+        for (long flowExecutionId : flowExecutionIds) {
+        DagActionStore.DagAction killDagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
+            DagActionStore.DagActionType.KILL);

Review Comment:
   is this only for case that we kill previous executions of a flow before a 
launch? not user invoked kill? why are we calling this in the orchestrator 
asfaik it should only deal with launches



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java:
##########
@@ -76,26 +75,21 @@ public void setUp() throws Exception {
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
-        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE)
+        
.addPrimitive(ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY, 
NonTransientException.class.getName());

Review Comment:
   how are NonTransientExceptions classified? Can we test specifically with the 
ServiceAcl error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to