[ 
https://issues.apache.org/jira/browse/GOBBLIN-2121?focusedWorklogId=928352&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-928352
 ]

ASF GitHub Bot logged work on GOBBLIN-2121:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Aug/24 20:07
            Start Date: 01/Aug/24 20:07
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #4012:
URL: https://github.com/apache/gobblin/pull/4012#discussion_r1700753986


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java:
##########
@@ -33,7 +33,7 @@
  * See javadoc for {@link DagAction}
  */
 public interface DagActionStore {
-  public static final String NO_JOB_NAME_DEFAULT = "";
+  String NO_JOB_NAME_DEFAULT = "";

Review Comment:
   why not static final?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java:
##########
@@ -120,6 +120,24 @@ && 
persistLaunchDagAction((LeaseAttemptStatus.LeaseObtainedStatus) leaseAttempt)
     }
   }
 
+  /**
+   * This method is used in the multi-active scheduler case for one or more 
hosts to respond to a kill dag action
+   * event triggered by the Orchestrator by attempting a lease for the kill 
event and processing the result depending on
+   * the status of the attempt.
+   */
+  public void handleFlowKillTriggerEvent(Properties jobProps, 
DagActionStore.LeaseParams leaseParams) throws IOException {

Review Comment:
   do we need this for resume as well? also this has duplicate code with launch 
so we can probably extract them out and have a function with the common logic 
that both `handleFlowLaunchTriggerEvent` & `handleFlowKillTriggerEvent` utilize



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -140,6 +153,11 @@ public void run() {
           dagTask.conclude();
         } catch (Exception e) {
           log.error("DagProcEngineThread encountered exception while 
processing dag " + dagProc.getDagId(), e);
+          if (KafkaJobStatusMonitor.isThrowableInstanceOf(e, 
this.nonRetryableExceptions)) {
+            // conclude the lease so that it is not retried, if the dag proc 
fails with non-transient exception
+            dagTask.conclude();
+            
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
+          }
           
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();

Review Comment:
   this should be in else block otherwise we are double counting



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java:
##########
@@ -103,6 +104,8 @@ public void activate() {
           ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR));
       dagProcessingExceptionMeter = 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
           ServiceMetricNames.DAG_PROCESSING_EXCEPTION_METER));
+      dagProcessingNonRetryableExceptionMeter = 
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          ServiceMetricNames.DAG_PROCESSING_NON_RETRYABLE_EXCEPTION_METER));

Review Comment:
   should we move both to DagProcessingEngineMetrics or do want to do in a 
later PR where the two are merged?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -319,14 +322,30 @@ public void submitFlowToDagManager(FlowSpec flowSpec, 
Dag<JobExecutionPlan> jobE
   }
 
   public void remove(Spec spec, Properties headers) throws IOException {
+    URI uri = spec.getUri();
     // TODO: Evolve logic to cache and reuse previously compiled JobSpecs
     // .. this will work for Identity compiler but not always for multi-hop.
     // Note: Current logic assumes compilation is consistent between all 
executions
     if (spec instanceof FlowSpec) {
-      //Send the dag to the DagManager to stop it.
-      //Also send it to the SpecProducer to do any cleanup tasks on 
SpecExecutor.
-      _log.info("Forwarding cancel request for flow URI {} to DagManager.", 
spec.getUri());
-      this.dagManager.stopDag(spec.getUri());
+      String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
+      String flowName = FlowSpec.Utils.getFlowName(uri);
+      if (this.flowLaunchHandler.isPresent()) {
+        List<Long> flowExecutionIds = 
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+        _log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+        for (long flowExecutionId : flowExecutionIds) {
+        DagActionStore.DagAction killDagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
+            DagActionStore.DagActionType.KILL);

Review Comment:
   is this only for case that we kill previous executions of a flow before a 
launch? not user invoked kill? why are we calling this in the orchestrator 
asfaik it should only deal with launches



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java:
##########
@@ -76,26 +75,21 @@ public void setUp() throws Exception {
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
-        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE)
+        
.addPrimitive(ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY, 
NonTransientException.class.getName());

Review Comment:
   how are NonTransientExceptions classified? Can we test specifically with the 
ServiceAcl error





Issue Time Tracking
-------------------

            Worklog Id:     (was: 928352)
    Remaining Estimate: 0h
            Time Spent: 10m

>  redirect kill requests to dag proc engine
> ------------------------------------------
>
>                 Key: GOBBLIN-2121
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2121
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to