[
https://issues.apache.org/jira/browse/GOBBLIN-1831?focusedWorklogId=861304&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-861304
]
ASF GitHub Bot logged work on GOBBLIN-1831:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 10/May/23 07:35
Start Date: 10/May/23 07:35
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3694:
URL: https://github.com/apache/gobblin/pull/3694#discussion_r1189465710
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java:
##########
@@ -136,14 +138,30 @@ protected void
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
break;
case DELETE:
this.removedSpecs.mark();
- URI jobSpecUri = parsedMessage.getUri();
- this.jobCatalog.remove(jobSpecUri);
+ this.jobCatalog.remove(parsedMessage.getUri());
// Delete the job state if it is a delete spec request
- deleteStateStore(jobSpecUri);
+ deleteStateStore(parsedMessage.getUri());
break;
case CANCEL:
- this.cancelledSpecs.mark();
- this.jobCatalog.remove(parsedMessage.getUri(), true);
+ // Validate that the flow execution ID of the running flow matches
the one in the incoming job spec
+ URI specUri = parsedMessage.getUri();
+ if
(!parsedMessage.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+ this.cancelledSpecs.mark();
+ this.jobCatalog.remove(specUri, true);
+ } else {
+ try {
+ JobSpec spec = this.jobCatalog.getJobSpec(specUri);
+ String flowIdToCancel =
parsedMessage.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ if
(spec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY).equals(flowIdToCancel))
{
+ this.cancelledSpecs.mark();
+ this.jobCatalog.remove(specUri, true);
+ } else {
+ log.warn("Could not find job spec {} with flow execution ID
{} to cancel", specUri, flowIdToCancel);
Review Comment:
let's log what the value of
`spec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)` actually
is.
also, could this ever be `null`?
Issue Time Tracking
-------------------
Worklog Id: (was: 861304)
Time Spent: 0.5h (was: 20m)
> Use Flow Execution ID in Gobblin cluster cancellation semantics and jobname
> IDs if possible
> -------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1831
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1831
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: William Lo
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> When executing jobs from GaaS to Gobblin cluster, there can be a mismatch of
> flowexecution ids to jobs running on Gobblin cluster.
> To address this, we tried adding the FlowExecutionId to the jobSpec, but that
> meant that jobs could run concurrently on Gobblin cluster when they should
> have been deduped.
> So instead, during cancellation, we want to check if the incoming spec has a
> flow execution ID. If so, then it will cancel the existing job only if the
> flow execution IDs match. Otherwise, it will know that the current job does
> not match the incoming request and should not be deleted.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)