This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new c908962 [GOBBLIN-1135] added back flow remove feature for spec executors when dag manager is not enabled codesyle changes c908962 is described below commit c908962e87cfd42c54da23f9160b460d277bb00e Author: Arjun <ab...@linkedin.com> AuthorDate: Thu Apr 30 16:55:11 2020 -0700 [GOBBLIN-1135] added back flow remove feature for spec executors when dag manager is not enabled codesyle changes Closes #2974 from arjun4084346/deleteClusterJobs --- .../apache/gobblin/runtime/api/SpecExecutor.java | 2 +- .../modules/orchestration/AzkabanSpecExecutor.java | 2 +- .../gobblin/service/SimpleKafkaSpecExecutor.java | 2 +- .../AbstractSpecExecutor.java | 3 ++- .../InMemorySpecExecutor.java | 4 ++-- .../spec_executorInstance/LocalFsSpecExecutor.java | 4 ++-- .../spec_executorInstance/MockedSpecExecutor.java | 4 ++-- .../modules/orchestration/Orchestrator.java | 22 +++++++++++++++++++++- .../modules/flow/MultiHopFlowCompilerTest.java | 2 +- 9 files changed, 33 insertions(+), 12 deletions(-) diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java index 85ee7af..8569275 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java @@ -53,7 +53,7 @@ public interface SpecExecutor { /** A communication socket for generating spec to assigned physical executors, paired with * a consumer on the physical executor side. */ - Future<? extends SpecProducer> getProducer(); + Future<? extends SpecProducer<Spec>> getProducer(); public static enum Verb { ADD(1, "add"), diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java index 0e6c4a0..bb970d9 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java @@ -70,7 +70,7 @@ public class AzkabanSpecExecutor extends AbstractSpecExecutor { @Override - public Future<? extends SpecProducer> getProducer() { + public Future<? extends SpecProducer<Spec>> getProducer() { return new CompletedFuture<>(this.azkabanSpecProducer, null); } diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java index c3dfcb3..29e735e 100644 --- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java +++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java @@ -58,7 +58,7 @@ public class SimpleKafkaSpecExecutor extends AbstractSpecExecutor { } @Override - public Future<? extends SpecProducer> getProducer() { + public Future<? extends SpecProducer<Spec>> getProducer() { return new CompletedFuture<>(this.specProducer, null); } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java index e0a235c..ea8c497 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java @@ -31,6 +31,7 @@ import com.google.common.io.Closer; import com.google.common.util.concurrent.AbstractIdleService; import com.typesafe.config.Config; +import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecConsumer; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.util.ConfigUtils; @@ -182,7 +183,7 @@ public abstract class AbstractSpecExecutor extends AbstractIdleService implement abstract protected void shutDown() throws Exception; - abstract public Future<? extends SpecProducer> getProducer(); + abstract public Future<? extends SpecProducer<Spec>> getProducer(); abstract public Future<String> getDescription(); } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java index e0be4e9..4f3c899 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java @@ -76,8 +76,8 @@ public class InMemorySpecExecutor extends AbstractSpecExecutor { } @Override - public Future<? extends SpecProducer> getProducer(){ - return new CompletedFuture(this.inMemorySpecProducer, null); + public Future<? extends SpecProducer<Spec>> getProducer(){ + return new CompletedFuture<>(this.inMemorySpecProducer, null); } @Override diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java index 4d49ed0..c6c9536 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/LocalFsSpecExecutor.java @@ -52,8 +52,8 @@ public class LocalFsSpecExecutor extends AbstractSpecExecutor { } @Override - public Future<? extends SpecProducer> getProducer(){ - return new CompletedFuture(this.specProducer, null); + public Future<? extends SpecProducer<Spec>> getProducer(){ + return new CompletedFuture<>(this.specProducer, null); } @Override diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java index 4a31a6e..5cfc53b 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java @@ -54,7 +54,7 @@ public class MockedSpecExecutor extends InMemorySpecExecutor { } @Override - public Future<? extends SpecProducer> getProducer(){ - return new CompletedFuture(this.mockedSpecProducer, null); + public Future<? extends SpecProducer<Spec>> getProducer(){ + return new CompletedFuture<>(this.mockedSpecProducer, null); } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index cefd513..d545f9c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -351,7 +351,27 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { _log.info("Forwarding cancel request for flow URI {} to DagManager.", spec.getUri()); this.dagManager.get().stopDag(spec.getUri()); } else { - _log.warn("Operation not supported."); + // If DagManager is not enabled, we need to recompile the flow to find the spec producer, + // If compilation results is different, it remove request can go to some different spec producer + Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec); + + if (jobExecutionPlanDag.isEmpty()) { + _log.warn("Cannot determine an executor to delete Spec: " + spec); + return; + } + + // Delete all compiled JobSpecs on their respective Executor + for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) { + JobExecutionPlan jobExecutionPlan = dagNode.getValue(); + Spec jobSpec = jobExecutionPlan.getJobSpec(); + try { + SpecProducer<Spec> producer = jobExecutionPlan.getSpecExecutor().getProducer().get(); + _log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer)); + producer.deleteSpec(jobSpec.getUri(), headers); + } catch (Exception e) { + _log.error(String.format("Could not delete JobSpec: %s for flow: %s", jobSpec, spec), e); + } + } } } else { throw new RuntimeException("Spec not of type FlowSpec, cannot delete: " + spec); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java index c03aa39..a99928e 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java @@ -761,7 +761,7 @@ public class MultiHopFlowCompilerTest { } @Override - public Future<? extends SpecProducer> getProducer() { + public Future<? extends SpecProducer<Spec>> getProducer() { return new CompletedFuture<>(this.azkabanSpecProducer, null); }