[ 
https://issues.apache.org/jira/browse/GOBBLIN-1187?focusedWorklogId=448709&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-448709
 ]

ASF GitHub Bot logged work on GOBBLIN-1187:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Jun/20 23:22
            Start Date: 19/Jun/20 23:22
    Worklog Time Spent: 10m 
      Work Description: arjun4084346 commented on a change in pull request 
#3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r443075847



##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
##########
@@ -293,28 +301,34 @@ public void orchestrate(Spec spec) throws Exception {
         this.dagManager.get().addDag(jobExecutionPlanDag, true);
       } else {
         // Schedule all compiled JobSpecs on their respective Executor
+        // This assumes that the JobSpecs do not have any dependency on each 
other and all can run together
         for (Dag.DagNode<JobExecutionPlan> dagNode : 
jobExecutionPlanDag.getNodes()) {
           DagManagerUtils.incrementJobAttempt(dagNode);
           JobExecutionPlan jobExecutionPlan = dagNode.getValue();
 
           // Run this spec on selected executor
-          SpecProducer producer = null;
+          SpecProducer<Spec> producer = null;
           try {
             producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
-            Spec jobSpec = jobExecutionPlan.getJobSpec();
+            JobSpec jobSpec = jobExecutionPlan.getJobSpec();
 
-            if (!((JobSpec) 
jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+            if 
(!jobSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
               _log.warn("JobSpec does not contain flowExecutionId.");
             }
 
             Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
-            _log.info(String.format("Going to orchestrate JobSpec: %s on 
Executor: %s", jobSpec, producer));
+            _log.info(String.format("Going to orchestrate JobSpec: %s on 
Executor: %s", jobSpec.toString(), producer));
 
             TimingEvent jobOrchestrationTimer = 
this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
                 getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : 
null;
 
             producer.addSpec(jobSpec);
 
+            if (!specProducerToSpecs.containsKey(producer)) {

Review comment:
       Correct me if I misunderstood; specProducerToSpecs is a mapping from 
producer -> list of specs processed by that producer. This map should have size 
of k, where k is the number of kafka topics. Orchestrator.remove() is removing 
the specs from the the value part of this map.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 448709)
    Time Spent: 3h 10m  (was: 3h)

> cancel a gaas flow when the dag manager is disabled
> ---------------------------------------------------
>
>                 Key: GOBBLIN-1187
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1187
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to