[
https://issues.apache.org/jira/browse/GOBBLIN-1437?focusedWorklogId=589964&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-589964
]
ASF GitHub Bot logged work on GOBBLIN-1437:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 27/Apr/21 19:05
Start Date: 27/Apr/21 19:05
Worklog Time Spent: 10m
Work Description: umustafi commented on a change in pull request #3268:
URL: https://github.com/apache/gobblin/pull/3268#discussion_r621514135
##########
File path:
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
##########
@@ -365,38 +365,42 @@ public void remove(Spec spec, Properties headers) throws
IOException {
// .. this will work for Identity compiler but not always for multi-hop.
// Note: Current logic assumes compilation is consistent between all
executions
if (spec instanceof FlowSpec) {
+ //Send the dag to the DagManager to stop it.
+ //Also send it to the SpecProducer to do any cleanup tasks on
SpecExecutor.
if (this.dagManager.isPresent()) {
- //Send the dag to the DagManager.
_log.info("Forwarding cancel request for flow URI {} to DagManager.",
spec.getUri());
this.dagManager.get().stopDag(spec.getUri());
- } else {
- // If DagManager is not enabled, we need to recompile the flow to find
the spec producer,
- // If compilation results is different, it remove request can go to
some different spec producer
- Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(spec);
-
- if (jobExecutionPlanDag.isEmpty()) {
- _log.warn("Cannot determine an executor to delete Spec: " + spec);
- return;
- }
-
- // Delete all compiled JobSpecs on their respective Executor
- for (Dag.DagNode<JobExecutionPlan> dagNode:
jobExecutionPlanDag.getNodes()) {
- JobExecutionPlan jobExecutionPlan = dagNode.getValue();
- Spec jobSpec = jobExecutionPlan.getJobSpec();
- try {
- SpecProducer<Spec> producer =
jobExecutionPlan.getSpecExecutor().getProducer().get();
- _log.info(String.format("Going to delete JobSpec: %s on Executor:
%s", jobSpec, producer));
- producer.deleteSpec(jobSpec.getUri(), headers);
- } catch (Exception e) {
- _log.error(String.format("Could not delete JobSpec: %s for flow:
%s", jobSpec, spec), e);
- }
- }
}
+ // We need to recompile the flow to find the spec producer,
+ // If compilation results is different, it remove request can go to some
different spec producer
Review comment:
small nitpick here "it's"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 589964)
Remaining Estimate: 0h
Time Spent: 10m
> Segregate FlowConfigs/Delete and FlowExecutions/Delete functions #3268
> ----------------------------------------------------------------------
>
> Key: GOBBLIN-1437
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1437
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
> there are some mixup of work in FlowConfigs/Delete and FlowExecutions/Delete
> which is to be fixed
> FlowConfigs/Delete
> 1) is not cancelling the current running execution on cluster (i.e. when
> KafkaSpecProducer is used)
> 2) is not deleting state store when DagManager is used and KafkaSpecProducer
> is used
> FlowExecutions/Delete
> 1) is not calling SpecProducer.cancel()
> 2) is deleting the state store when KafkaSpecProducer is used, because
> cluster thinks it is a DELETE request
--
This message was sent by Atlassian Jira
(v8.3.4#803005)