[ 
https://issues.apache.org/jira/browse/GOBBLIN-1831?focusedWorklogId=861304&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-861304
 ]

ASF GitHub Bot logged work on GOBBLIN-1831:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/May/23 07:35
            Start Date: 10/May/23 07:35
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3694:
URL: https://github.com/apache/gobblin/pull/3694#discussion_r1189465710


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java:
##########
@@ -136,14 +138,30 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
             break;
           case DELETE:
             this.removedSpecs.mark();
-            URI jobSpecUri = parsedMessage.getUri();
-            this.jobCatalog.remove(jobSpecUri);
+            this.jobCatalog.remove(parsedMessage.getUri());
             // Delete the job state if it is a delete spec request
-            deleteStateStore(jobSpecUri);
+            deleteStateStore(parsedMessage.getUri());
             break;
           case CANCEL:
-            this.cancelledSpecs.mark();
-            this.jobCatalog.remove(parsedMessage.getUri(), true);
+            // Validate that the flow execution ID of the running flow matches 
the one in the incoming job spec
+            URI specUri = parsedMessage.getUri();
+            if 
(!parsedMessage.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+              this.cancelledSpecs.mark();
+              this.jobCatalog.remove(specUri, true);
+            } else {
+              try {
+                JobSpec spec = this.jobCatalog.getJobSpec(specUri);
+                String flowIdToCancel = 
parsedMessage.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+                if 
(spec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY).equals(flowIdToCancel))
 {
+                  this.cancelledSpecs.mark();
+                  this.jobCatalog.remove(specUri, true);
+                } else {
+                  log.warn("Could not find job spec {} with flow execution ID 
{} to cancel", specUri, flowIdToCancel);

Review Comment:
   let's log what the value of 
`spec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)` actually 
is.
   
   also, could this ever be `null`?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 861304)
    Time Spent: 0.5h  (was: 20m)

> Use Flow Execution ID in Gobblin cluster cancellation semantics and jobname 
> IDs if possible
> -------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1831
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1831
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: William Lo
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> When executing jobs from GaaS to Gobblin cluster, there can be a mismatch of 
> flowexecution ids to jobs running on Gobblin cluster.
> To address this, we tried adding the FlowExecutionId to the jobSpec, but that 
> meant that jobs could run concurrently on Gobblin cluster when they should 
> have been deduped.
> So instead, during cancellation, we want to check if the incoming spec has a 
> flow execution ID. If so, then it will cancel the existing job only if the 
> flow execution IDs match. Otherwise, it will know that the current job does 
> not match the incoming request and should not be deleted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to