Repository: incubator-gobblin Updated Branches: refs/heads/master 95eff37ce -> c103a8f6a
[GOBBLIN-614][GOBBLIN-599][GOBBLIN-598][GOBBLIN-615][GOBBLIN-616] Allow multiple flow failure options in DagManager. Closes #2481 from sv2000/retention Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c103a8f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c103a8f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c103a8f6 Branch: refs/heads/master Commit: c103a8f6a6fc09926fdb2b62ead516b63a4ba9aa Parents: 95eff37 Author: suvasude <[email protected]> Authored: Tue Oct 23 13:35:42 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Oct 23 13:35:42 2018 -0700 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 1 + .../modules/orchestration/DagManager.java | 142 ++++++++---- .../modules/orchestration/DagManagerUtils.java | 54 +++-- .../service/modules/spec/JobExecutionPlan.java | 7 +- .../modules/orchestration/DagManagerTest.java | 218 +++++++++++-------- .../orchestration/DagManagerUtilsTest.java | 117 ---------- 6 files changed, 268 insertions(+), 271 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c103a8f6/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 40d2f44..ffc2376 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -132,6 +132,7 @@ public class ConfigurationKeys { public static final String FLOW_GROUP_KEY = "flow.group"; public static final String FLOW_DESCRIPTION_KEY = "flow.description"; public static final String FLOW_EXECUTION_ID_KEY = "flow.executionId"; + public static final String FLOW_FAILURE_OPTION = "flow.failureOption"; /** * Common topology configuration properties. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c103a8f6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index ac304fb..2d0c555 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -50,12 +50,18 @@ import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecProducer; import org.apache.gobblin.service.ExecutionStatus; import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.service.monitoring.JobStatus; import org.apache.gobblin.service.monitoring.JobStatusRetriever; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; +import static org.apache.gobblin.service.ExecutionStatus.COMPLETE; +import static org.apache.gobblin.service.ExecutionStatus.FAILED; +import static org.apache.gobblin.service.ExecutionStatus.RUNNING; +import static org.apache.gobblin.service.ExecutionStatus.valueOf; + /** * This class implements a manager to manage the life cycle of a {@link Dag}. A {@link Dag} is submitted to the @@ -78,16 +84,43 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils; @Alpha @Slf4j public class DagManager extends AbstractIdleService { + public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name(); + private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10; private static final Integer DEFAULT_NUM_THREADS = 3; private static final Integer TERMINATION_TIMEOUT = 30; + private static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager."; + private static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + "numThreads"; + private static final String JOB_STATUS_POLLING_INTERVAL_KEY = DAG_MANAGER_PREFIX + "pollingInterval"; + private static final String JOB_STATUS_RETRIEVER_KEY = DAG_MANAGER_PREFIX + "jobStatusRetriever"; + private static final String DAG_STORE_CLASS_KEY = DAG_MANAGER_PREFIX + "dagStateStoreClass"; + + static final String DAG_STATESTORE_DIR = DAG_MANAGER_PREFIX + "dagStateStoreDir"; - public static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager."; - public static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + "numThreads"; - public static final String JOB_STATUS_POLLING_INTERVAL_KEY = DAG_MANAGER_PREFIX + "pollingInterval"; - public static final String JOB_STATUS_RETRIEVER_KEY = DAG_MANAGER_PREFIX + "jobStatusRetriever"; - public static final String DAG_STORE_CLASS_KEY = DAG_MANAGER_PREFIX + "dagStateStoreClass"; - public static final String DAG_STATESTORE_DIR = DAG_MANAGER_PREFIX + "dagStateStoreDir"; + /** + * Action to be performed on a {@link Dag}, in case of a job failure. Currently, we allow 2 modes: + * <ul> + * <li> FINISH_RUNNING, which allows currently running jobs to finish.</li> + * <li> FINISH_ALL_POSSIBLE, which allows every possible job in the Dag to finish, as long as all the dependencies + * of the job are successful.</li> + * </ul> + */ + public enum FailureOption { + FINISH_RUNNING("FINISH_RUNNING"), + CANCEL("CANCEL"), + FINISH_ALL_POSSIBLE("FINISH_ALL_POSSIBLE"); + + private final String failureOption; + + FailureOption(final String failureOption) { + this.failureOption = failureOption; + } + + @Override + public String toString() { + return this.failureOption; + } + } private BlockingQueue<Dag<JobExecutionPlan>> queue; private ScheduledExecutorService scheduledExecutorPool; @@ -138,7 +171,7 @@ public class DagManager extends AbstractIdleService { * submitted dag to the {@link DagStateStore} and then adds the dag to a {@link BlockingQueue} to be picked up * by one of the {@link DagManagerThread}s. */ - public synchronized void offer(Dag<JobExecutionPlan> dag) throws IOException { + synchronized void offer(Dag<JobExecutionPlan> dag) throws IOException { //Persist the dag this.dagStateStore.writeCheckpoint(dag); //Add it to the queue of dags @@ -175,10 +208,11 @@ public class DagManager extends AbstractIdleService { * </ol> */ public static class DagManagerThread implements Runnable { - private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>(); + private final Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>(); private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>(); - private final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>(); - private final Set<String> failedDagIds = new HashSet<>(); + private final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>(); + private final Set<String> failedDagIdsFinishRunning = new HashSet<>(); + private final Set<String> failedDagIdsFinishAllPossible = new HashSet<>(); private final MetricContext metricContext; private final Optional<EventSubmitter> eventSubmitter; @@ -189,8 +223,8 @@ public class DagManager extends AbstractIdleService { /** * Constructor. */ - public DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, BlockingQueue<Dag<JobExecutionPlan>> queue, - boolean instrumentationEnabled) { + DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, + BlockingQueue<Dag<JobExecutionPlan>> queue, boolean instrumentationEnabled) { this.jobStatusRetriever = jobStatusRetriever; this.dagStateStore = dagStateStore; this.queue = queue; @@ -204,7 +238,8 @@ public class DagManager extends AbstractIdleService { } /** - * Main body of the {@link DagManagerThread}. + * Main body of the {@link DagManagerThread}. Deque the next item from the queue and poll job statuses of currently + * running jobs. */ @Override public void run() { @@ -213,7 +248,9 @@ public class DagManager extends AbstractIdleService { //Poll the queue for a new Dag to execute. if (nextItem != null) { Dag<JobExecutionPlan> dag = (Dag<JobExecutionPlan>) nextItem; - + if (dag.isEmpty()) { + log.info("Empty dag; ignoring the dag"); + } //Initialize dag. initialize(dag); } @@ -251,18 +288,18 @@ public class DagManager extends AbstractIdleService { } this.dags.put(dagId, dag); - log.info("Dag {} - determining if any jobs are already running."); + log.info("Dag {} - determining if any jobs are already running.", dagId); //Are there any jobs already in the running state? This check is for Dags already running //before a leadership change occurs. - for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) { - if (DagManagerUtils.getExecutionStatus(dagNode) == ExecutionStatus.RUNNING) { + for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) { + if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) { addJobState(dagId, dagNode); } } - log.info("Dag {} submitting jobs ready for execution."); + log.info("Dag {} submitting jobs ready for execution.", dagId); //Determine the next set of jobs to run and submit them for execution submitNext(dagId); - log.info("Dag {} Initialization complete."); + log.info("Dag {} Initialization complete.", dagId); } /** @@ -271,8 +308,8 @@ public class DagManager extends AbstractIdleService { */ private void pollJobStatuses() throws IOException { - this.failedDagIds.clear(); - for (Dag.DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) { + this.failedDagIdsFinishRunning.clear(); + for (DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) { TimingEvent jobStatusPollTimer = this.eventSubmitter.isPresent() ? eventSubmitter.get().getTimingEvent(TimingEvent.JobStatusTimings.JOB_STATUS_POLLED) : null; @@ -282,19 +319,21 @@ public class DagManager extends AbstractIdleService { } Preconditions.checkNotNull(jobStatus, "Received null job status for a running job " + DagManagerUtils.getJobName(node)); JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(node); - //TODO: This will be updated when JobStatus schema provides the correct execution status. - //Currently, it is a placeholder. - switch (jobStatus.getEventName()) { - case TimingEvent.LauncherTimings.JOB_COMPLETE: - jobExecutionPlan.setExecutionStatus(ExecutionStatus.COMPLETE); + + ExecutionStatus status = valueOf(jobStatus.getEventName()); + + switch (status) { + case COMPLETE: + jobExecutionPlan.setExecutionStatus(COMPLETE); onJobFinish(node); break; - case TimingEvent.LauncherTimings.JOB_FAILED: - jobExecutionPlan.setExecutionStatus(ExecutionStatus.FAILED); + case FAILED: + case CANCELLED: + jobExecutionPlan.setExecutionStatus(FAILED); onJobFinish(node); break; default: - jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING); + jobExecutionPlan.setExecutionStatus(RUNNING); break; } } @@ -303,7 +342,7 @@ public class DagManager extends AbstractIdleService { /** * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}. */ - private JobStatus pollJobStatus(Dag.DagNode<JobExecutionPlan> dagNode) { + private JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode) { Config jobConfig = dagNode.getValue().getJobSpec().getConfig(); String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); @@ -320,11 +359,11 @@ public class DagManager extends AbstractIdleService { } } - public void submitNext(String dagId) - throws IOException { + void submitNext(String dagId) throws IOException { Dag<JobExecutionPlan> dag = this.dags.get(dagId); + Set<DagNode<JobExecutionPlan>> nextNodes = DagManagerUtils.getNext(dag); //Submit jobs from the dag ready for execution. - for (Dag.DagNode<JobExecutionPlan> dagNode : DagManagerUtils.getNext(dag)) { + for (DagNode<JobExecutionPlan> dagNode : nextNodes) { submitJob(dagNode); addJobState(dagId, dagNode); } @@ -335,9 +374,9 @@ public class DagManager extends AbstractIdleService { /** * Submits a {@link JobSpec} to a {@link org.apache.gobblin.runtime.api.SpecExecutor}. */ - private void submitJob(Dag.DagNode<JobExecutionPlan> dagNode) { + private void submitJob(DagNode<JobExecutionPlan> dagNode) { JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode); - jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING); + jobExecutionPlan.setExecutionStatus(RUNNING); JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode); // Run this spec on selected executor @@ -359,7 +398,7 @@ public class DagManager extends AbstractIdleService { * Method that defines the actions to be performed when a job finishes either successfully or with failure. * This method updates the state of the dag and performs clean up actions as necessary. */ - private void onJobFinish(Dag.DagNode<JobExecutionPlan> dagNode) + private void onJobFinish(DagNode<JobExecutionPlan> dagNode) throws IOException { Dag<JobExecutionPlan> dag = this.jobToDag.get(dagNode); String dagId = DagManagerUtils.generateDagId(dag); @@ -369,25 +408,29 @@ public class DagManager extends AbstractIdleService { deleteJobState(dagId, dagNode); - if (jobStatus == ExecutionStatus.COMPLETE) { + if (jobStatus == COMPLETE) { submitNext(dagId); - } else if (jobStatus == ExecutionStatus.FAILED) { - this.failedDagIds.add(dagId); + } else if (jobStatus == FAILED) { + if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) { + this.failedDagIdsFinishRunning.add(dagId); + } else { + this.failedDagIdsFinishAllPossible.add(dagId); + } } } - private void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode) { + private void deleteJobState(String dagId, DagNode<JobExecutionPlan> dagNode) { this.jobToDag.remove(dagNode); this.dagToJobs.get(dagId).remove(dagNode); } - private void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode) { + private void addJobState(String dagId, DagNode<JobExecutionPlan> dagNode) { Dag<JobExecutionPlan> dag = this.dags.get(dagId); this.jobToDag.put(dagNode, dag); if (this.dagToJobs.containsKey(dagId)) { this.dagToJobs.get(dagId).add(dagNode); } else { - LinkedList<Dag.DagNode<JobExecutionPlan>> dagNodeList = Lists.newLinkedList(); + LinkedList<DagNode<JobExecutionPlan>> dagNodeList = Lists.newLinkedList(); dagNodeList.add(dagNode); this.dagToJobs.put(dagId, dagNodeList); } @@ -402,21 +445,26 @@ public class DagManager extends AbstractIdleService { */ private void cleanUp() { //Clean up failed dags - for (String dagId : this.failedDagIds) { + for (String dagId : this.failedDagIdsFinishRunning) { //Skip monitoring of any other jobs of the failed dag. - LinkedList<Dag.DagNode<JobExecutionPlan>> dagNodeList = this.dagToJobs.get(dagId); + LinkedList<DagNode<JobExecutionPlan>> dagNodeList = this.dagToJobs.get(dagId); while (!dagNodeList.isEmpty()) { - Dag.DagNode<JobExecutionPlan> dagNode = dagNodeList.poll(); + DagNode<JobExecutionPlan> dagNode = dagNodeList.poll(); deleteJobState(dagId, dagNode); } log.info("Dag {} has finished with status FAILED; Cleaning up dag from the state store.", dagId); cleanUpDag(dagId); } - //Clean up successfully completed dags + //Clean up completed dags for (String dagId : this.dags.keySet()) { if (!hasRunningJobs(dagId)) { - log.info("Dag {} has finished with status COMPLETE; Cleaning up dag from the state store.", dagId); + String status = "COMPLETE"; + if (this.failedDagIdsFinishAllPossible.contains(dagId)) { + status = "FAILED"; + this.failedDagIdsFinishAllPossible.remove(dagId); + } + log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status); cleanUpDag(dagId); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c103a8f6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java index 348aa78..9ac51fe 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java @@ -31,7 +31,11 @@ import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.SpecProducer; import org.apache.gobblin.service.ExecutionStatus; import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode; +import org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; + public class DagManagerUtils { /** @@ -39,7 +43,7 @@ public class DagManagerUtils { * @param dag instance of a {@link Dag}. * @return a String id associated corresponding to the {@link Dag} instance. */ - public static String generateDagId(Dag<JobExecutionPlan> dag) { + static String generateDagId(Dag<JobExecutionPlan> dag) { Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig(); String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); @@ -47,28 +51,28 @@ public class DagManagerUtils { return Joiner.on("_").join(flowGroup, flowName, flowExecutionId); } - public static String getJobName(Dag.DagNode<JobExecutionPlan> dagNode) { + static String getJobName(DagNode<JobExecutionPlan> dagNode) { return dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY); } - public static JobExecutionPlan getJobExecutionPlan(Dag.DagNode<JobExecutionPlan> dagNode) { + static JobExecutionPlan getJobExecutionPlan(DagNode<JobExecutionPlan> dagNode) { return dagNode.getValue(); } - public static JobSpec getJobSpec(Dag.DagNode<JobExecutionPlan> dagNode) { + public static JobSpec getJobSpec(DagNode<JobExecutionPlan> dagNode) { return dagNode.getValue().getJobSpec(); } - public static Config getJobConfig(Dag.DagNode<JobExecutionPlan> dagNode) { + static Config getJobConfig(DagNode<JobExecutionPlan> dagNode) { return dagNode.getValue().getJobSpec().getConfig(); } - public static SpecProducer getSpecProducer(Dag.DagNode<JobExecutionPlan> dagNode) + static SpecProducer getSpecProducer(DagNode<JobExecutionPlan> dagNode) throws ExecutionException, InterruptedException { return dagNode.getValue().getSpecExecutor().getProducer().get(); } - public static ExecutionStatus getExecutionStatus(Dag.DagNode<JobExecutionPlan> dagNode) { + static ExecutionStatus getExecutionStatus(DagNode<JobExecutionPlan> dagNode) { return dagNode.getValue().getExecutionStatus(); } @@ -77,17 +81,19 @@ public class DagManagerUtils { * identifies each node yet to be executed and for which each of its parent nodes is in the {@link ExecutionStatus#COMPLETE} * state. */ - public static Set<Dag.DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> dag) { - Set<Dag.DagNode<JobExecutionPlan>> nextNodesToExecute = new HashSet<>(); - LinkedList<Dag.DagNode<JobExecutionPlan>> nodesToExpand = Lists.newLinkedList(dag.getStartNodes()); + static Set<DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> dag) { + Set<DagNode<JobExecutionPlan>> nextNodesToExecute = new HashSet<>(); + LinkedList<DagNode<JobExecutionPlan>> nodesToExpand = Lists.newLinkedList(dag.getStartNodes()); + FailureOption failureOption = getFailureOption(dag); while (!nodesToExpand.isEmpty()) { - Dag.DagNode<JobExecutionPlan> node = nodesToExpand.poll(); + DagNode<JobExecutionPlan> node = nodesToExpand.poll(); + ExecutionStatus executionStatus = getExecutionStatus(node); boolean addFlag = true; - if (getExecutionStatus(node) == ExecutionStatus.$UNKNOWN) { + if (executionStatus == ExecutionStatus.$UNKNOWN) { //Add a node to be executed next, only if all of its parent nodes are COMPLETE. - List<Dag.DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node); - for (Dag.DagNode<JobExecutionPlan> parentNode : parentNodes) { + List<DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node); + for (DagNode<JobExecutionPlan> parentNode : parentNodes) { if (getExecutionStatus(parentNode) != ExecutionStatus.COMPLETE) { addFlag = false; break; @@ -96,14 +102,28 @@ public class DagManagerUtils { if (addFlag) { nextNodesToExecute.add(node); } - } else if (getExecutionStatus(node) == ExecutionStatus.COMPLETE) { + } else if (executionStatus == ExecutionStatus.COMPLETE) { //Explore the children of COMPLETED node as next candidates for execution. nodesToExpand.addAll(dag.getChildren(node)); - } else { - return new HashSet<>(); + } else if ((executionStatus == ExecutionStatus.FAILED) || (executionStatus == ExecutionStatus.CANCELLED)) { + switch (failureOption) { + case FINISH_RUNNING: + return new HashSet<>(); + case FINISH_ALL_POSSIBLE: + default: + break; + } } } return nextNodesToExecute; } + static FailureOption getFailureOption(Dag<JobExecutionPlan> dag) { + if (dag.isEmpty()) { + return null; + } + DagNode<JobExecutionPlan> dagNode = dag.getStartNodes().get(0); + String failureOption = ConfigUtils.getString(getJobConfig(dagNode), ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION); + return FailureOption.valueOf(failureOption); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c103a8f6/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java index 023e104..5e9f86c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java @@ -35,6 +35,7 @@ import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.service.ExecutionStatus; +import org.apache.gobblin.service.modules.orchestration.DagManager; import org.apache.gobblin.util.ConfigUtils; @@ -71,6 +72,7 @@ public class JobExecutionPlan { String flowName = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_NAME_KEY, ""); String flowGroup = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_GROUP_KEY, ""); String jobName = ConfigUtils.getString(jobConfig, ConfigurationKeys.JOB_NAME_KEY, ""); + String flowFailureOption = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION); //Modify the job name to include the flow group, flow name and a randomly generated integer to make the job name unique. jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName, jobName, random.nextInt(Integer.MAX_VALUE)); @@ -101,8 +103,9 @@ public class JobExecutionPlan { jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_NAME_KEY, ConfigValueFactory.fromAnyRef(jobName))); jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup))); - //Enable job lock for each job to prevent concurrent executions. - jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_LOCK_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true))); + //Add flow failure option + jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_FAILURE_OPTION, + ConfigValueFactory.fromAnyRef(flowFailureOption))); // Reset properties in Spec from Config jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig())); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c103a8f6/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java index f7aaaed..f23ece1 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java @@ -17,6 +17,7 @@ package org.apache.gobblin.service.modules.orchestration; import java.io.File; +import java.io.IOException; import java.lang.reflect.Field; import java.net.URI; import java.net.URISyntaxException; @@ -35,17 +36,19 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; import org.apache.gobblin.config.ConfigBuilder; import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.service.ExecutionStatus; import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; import org.apache.gobblin.service.monitoring.JobStatus; @@ -58,8 +61,8 @@ public class DagManagerTest { private JobStatusRetriever _jobStatusRetriever; private DagManager.DagManagerThread _dagManagerThread; private LinkedBlockingQueue<Dag<JobExecutionPlan>> queue; - private Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag; - private Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs; + private Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag; + private Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs; private Map<String, Dag<JobExecutionPlan>> dags; @BeforeClass @@ -74,11 +77,11 @@ public class DagManagerTest { Field jobToDagField = DagManager.DagManagerThread.class.getDeclaredField("jobToDag"); jobToDagField.setAccessible(true); - this.jobToDag = (Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>>) jobToDagField.get(this._dagManagerThread); + this.jobToDag = (Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>>) jobToDagField.get(this._dagManagerThread); Field dagToJobsField = DagManager.DagManagerThread.class.getDeclaredField("dagToJobs"); dagToJobsField.setAccessible(true); - this.dagToJobs = (Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>>) dagToJobsField.get(this._dagManagerThread); + this.dagToJobs = (Map<String, LinkedList<DagNode<JobExecutionPlan>>>) dagToJobsField.get(this._dagManagerThread); Field dagsField = DagManager.DagManagerThread.class.getDeclaredField("dags"); dagsField.setAccessible(true); @@ -87,12 +90,13 @@ public class DagManagerTest { } /** - * Create a {@link Dag < JobExecutionPlan >} with one parent and remaining nodes as its children. + * Create a {@link Dag <JobExecutionPlan>}. * @return a Dag. */ - public Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId, int numNodes) + private Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId, String flowFailureOption, boolean flag) throws URISyntaxException { List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(); + int numNodes = (flag) ? 3 : 5; for (int i = 0; i < numNodes; i++) { String suffix = Integer.toString(i); Config jobConfig = ConfigBuilder.create(). @@ -100,9 +104,14 @@ public class DagManagerTest { addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id). addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId). addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id). - addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).build(); - if (i > 0) { + addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix). + addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, flowFailureOption).build(); + if ((i == 1) || (i == 2)) { jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job0")); + } else if (i == 3) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job1")); + } else if (i == 4) { + jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job2")); } JobSpec js = JobSpec.builder("test_job" + suffix).withVersion(suffix).withConfig(jobConfig). withTemplate(new URI("job" + suffix)).build(); @@ -119,28 +128,27 @@ public class DagManagerTest { } @Test - public void testSuccessfulDag() throws URISyntaxException { + public void testSuccessfulDag() throws URISyntaxException, IOException { long flowExecutionId = System.currentTimeMillis(); String flowGroupId = "0"; String flowGroup = "group" + flowGroupId; String flowName = "flow" + flowGroupId; - String jobGroup = flowGroup; String jobName0 = "job0"; String jobName1 = "job1"; String jobName2 = "job2"; - Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, 3); + Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, "FINISH_RUNNING", true); String dagId = DagManagerUtils.generateDagId(dag); //Add a dag to the queue of dags this.queue.offer(dag); - Iterator<JobStatus> jobStatusIterator1 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName0, TimingEvent.LauncherTimings.JOB_RUN); - Iterator<JobStatus> jobStatusIterator2 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName0, TimingEvent.LauncherTimings.JOB_COMPLETE); - Iterator<JobStatus> jobStatusIterator3 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName1, TimingEvent.LauncherTimings.JOB_RUN); - Iterator<JobStatus> jobStatusIterator4 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName2, TimingEvent.LauncherTimings.JOB_RUN); - Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName1, TimingEvent.LauncherTimings.JOB_RUN); - Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName2, TimingEvent.LauncherTimings.JOB_COMPLETE); - Iterator<JobStatus> jobStatusIterator7 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName2, TimingEvent.LauncherTimings.JOB_COMPLETE); + Iterator<JobStatus> jobStatusIterator1 = getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, jobName0, String.valueOf(ExecutionStatus.RUNNING)); + Iterator<JobStatus> jobStatusIterator2 = getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, jobName0, String.valueOf(ExecutionStatus.COMPLETE)); + Iterator<JobStatus> jobStatusIterator3 = getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, jobName1, String.valueOf(ExecutionStatus.RUNNING)); + Iterator<JobStatus> jobStatusIterator4 = getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, jobName2, String.valueOf(ExecutionStatus.RUNNING)); + Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, jobName1, String.valueOf(ExecutionStatus.RUNNING)); + Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, jobName2, String.valueOf(ExecutionStatus.COMPLETE)); + Iterator<JobStatus> jobStatusIterator7 = getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, jobName1, String.valueOf(ExecutionStatus.COMPLETE)); Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())). @@ -195,78 +203,112 @@ public class DagManagerTest { Assert.assertEquals(this.dags.size(), 0); Assert.assertEquals(this.jobToDag.size(), 0); Assert.assertEquals(this.dagToJobs.size(), 0); + Assert.assertEquals(this._dagStateStore.getDags().size(), 0); } @Test (dependsOnMethods = "testSuccessfulDag") - public void testFailedDag() throws URISyntaxException { - long flowExecutionId = System.currentTimeMillis(); - String flowGroupId = "0"; - String flowGroup = "group" + flowGroupId; - String flowName = "flow" + flowGroupId; - String jobGroup = flowGroup; - String jobName0 = "job0"; - String jobName1 = "job1"; - String jobName2 = "job2"; - - Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, 3); - String dagId = DagManagerUtils.generateDagId(dag); - - //Add a dag to the queue of dags - this.queue.offer(dag); - Iterator<JobStatus> jobStatusIterator1 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName0, TimingEvent.LauncherTimings.JOB_START); - Iterator<JobStatus> jobStatusIterator2 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName0, TimingEvent.LauncherTimings.JOB_COMPLETE); - - Iterator<JobStatus> jobStatusIterator3 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName1, TimingEvent.LauncherTimings.JOB_RUN); - Iterator<JobStatus> jobStatusIterator4 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName2, TimingEvent.LauncherTimings.JOB_RUN); - - Iterator<JobStatus> jobStatusIterator5 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName1, TimingEvent.LauncherTimings.JOB_RUN); - Iterator<JobStatus> jobStatusIterator6 = getMockJobStatus(flowName, flowGroup, flowExecutionId, jobGroup, jobName2, TimingEvent.LauncherTimings.JOB_FAILED); - - Mockito.when(_jobStatusRetriever.getJobStatusesForFlowExecution(Mockito.anyString(), Mockito.anyString(), - Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())). - thenReturn(jobStatusIterator1). - thenReturn(jobStatusIterator2). - thenReturn(jobStatusIterator3). - thenReturn(jobStatusIterator4). - thenReturn(jobStatusIterator5). - thenReturn(jobStatusIterator6); - - //Run the thread once. Ensure the first job is running - this._dagManagerThread.run(); - Assert.assertEquals(this.dags.size(), 1); - Assert.assertTrue(this.dags.containsKey(dagId)); - Assert.assertEquals(this.jobToDag.size(), 1); - Assert.assertTrue(this.jobToDag.containsKey(dag.getStartNodes().get(0))); - Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1); - Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getStartNodes().get(0))); - - //Run the thread 2nd time. Ensure the job0 is complete and job1 and job2 are submitted. - this._dagManagerThread.run(); - Assert.assertEquals(this.dags.size(), 1); - Assert.assertTrue(this.dags.containsKey(dagId)); - Assert.assertEquals(this.jobToDag.size(), 2); - Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(0))); - Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(1))); - Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2); - Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(0))); - Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(1))); - - //Run the thread 3rd time. Ensure the job0 is complete and job1 and job2 are running. - this._dagManagerThread.run(); - Assert.assertEquals(this.dags.size(), 1); - Assert.assertTrue(this.dags.containsKey(dagId)); - Assert.assertEquals(this.jobToDag.size(), 2); - Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(0))); - Assert.assertTrue(this.jobToDag.containsKey(dag.getEndNodes().get(1))); - Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2); - Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(0))); - Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getEndNodes().get(1))); - - //Run the thread 4th time. One of the jobs is failed and so the dag is failed and all state is cleaned up. - this._dagManagerThread.run(); - Assert.assertEquals(this.dags.size(), 0); - Assert.assertEquals(this.jobToDag.size(), 0); - Assert.assertEquals(this.dagToJobs.size(), 0); + public void testFailedDag() throws URISyntaxException, IOException { + for (String failureOption: Lists.newArrayList("FINISH_RUNNING", "FINISH_ALL_POSSIBLE")) { + long flowExecutionId = System.currentTimeMillis(); + String flowGroupId = "0"; + String flowGroup = "group" + flowGroupId; + String flowName = "flow" + flowGroupId; + String jobName0 = "job0"; + String jobName1 = "job1"; + String jobName2 = "job2"; + + Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, failureOption, false); + String dagId = DagManagerUtils.generateDagId(dag); + + //Add a dag to the queue of dags + this.queue.offer(dag); + Iterator<JobStatus> jobStatusIterator1 = + getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, jobName0, String.valueOf(ExecutionStatus.RUNNING)); + Iterator<JobStatus> jobStatusIterator2 = + getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, jobName0, String.valueOf(ExecutionStatus.COMPLETE)); + Iterator<JobStatus> jobStatusIterator3 = + getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, jobName1, String.valueOf(ExecutionStatus.RUNNING)); + Iterator<JobStatus> jobStatusIterator4 = + getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, jobName2, String.valueOf(ExecutionStatus.RUNNING)); + Iterator<JobStatus> jobStatusIterator5 = + getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, jobName1, String.valueOf(ExecutionStatus.RUNNING)); + Iterator<JobStatus> jobStatusIterator6 = + getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, jobName2, String.valueOf(ExecutionStatus.FAILED)); + Iterator<JobStatus> jobStatusIterator7 = + getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, jobName1, String.valueOf(ExecutionStatus.COMPLETE)); + Iterator<JobStatus> jobStatusIterator8 = + getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, jobName1, String.valueOf(ExecutionStatus.RUNNING)); + Iterator<JobStatus> jobStatusIterator9 = + getMockJobStatus(flowName, flowGroup, flowExecutionId, flowGroup, jobName1, String.valueOf(ExecutionStatus.COMPLETE)); + + + Mockito.when(_jobStatusRetriever + .getJobStatusesForFlowExecution(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())). + thenReturn(jobStatusIterator1). + thenReturn(jobStatusIterator2). + thenReturn(jobStatusIterator3). + thenReturn(jobStatusIterator4). + thenReturn(jobStatusIterator5). + thenReturn(jobStatusIterator6). + thenReturn(jobStatusIterator7). + thenReturn(jobStatusIterator8). + thenReturn(jobStatusIterator9); + + //Run the thread once. Ensure the first job is running + this._dagManagerThread.run(); + Assert.assertEquals(this.dags.size(), 1); + Assert.assertTrue(this.dags.containsKey(dagId)); + Assert.assertEquals(this.jobToDag.size(), 1); + Assert.assertTrue(this.jobToDag.containsKey(dag.getStartNodes().get(0))); + Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1); + Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getStartNodes().get(0))); + + //Run the thread 2nd time. Ensure the job0 is complete and job1 and job2 are submitted. + this._dagManagerThread.run(); + Assert.assertEquals(this.dags.size(), 1); + Assert.assertTrue(this.dags.containsKey(dagId)); + Assert.assertEquals(this.jobToDag.size(), 2); + Assert.assertTrue(this.jobToDag.containsKey(dag.getNodes().get(1))); + Assert.assertTrue(this.jobToDag.containsKey(dag.getNodes().get(2))); + Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2); + Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getNodes().get(1))); + Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getNodes().get(2))); + + //Run the thread 3rd time. Ensure the job0 is complete and job1 and job2 are running. + this._dagManagerThread.run(); + Assert.assertEquals(this.dags.size(), 1); + Assert.assertTrue(this.dags.containsKey(dagId)); + Assert.assertEquals(this.jobToDag.size(), 2); + Assert.assertTrue(this.jobToDag.containsKey(dag.getNodes().get(1))); + Assert.assertTrue(this.jobToDag.containsKey(dag.getNodes().get(2))); + Assert.assertEquals(this.dagToJobs.get(dagId).size(), 2); + Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getNodes().get(1))); + Assert.assertTrue(this.dagToJobs.get(dagId).contains(dag.getNodes().get(2))); + + //Run the thread 4th time. + this._dagManagerThread.run(); + + if ("FINISH_RUNNING".equals(failureOption)) { + //One of the jobs is failed; so the dag is failed and all state is cleaned up. + Assert.assertEquals(this.dags.size(), 0); + Assert.assertEquals(this.jobToDag.size(), 0); + Assert.assertEquals(this.dagToJobs.size(), 0); + Assert.assertEquals(this._dagStateStore.getDags().size(), 0); + } else { + //One of the jobs is failed; but with finish_all_possible, some jobs can continue running. + for (int i = 0; i < 3; i++) { + Assert.assertEquals(this.dags.size(), 1); + Assert.assertEquals(this.jobToDag.size(), 1); + Assert.assertEquals(this.dagToJobs.get(dagId).size(), 1); + this._dagManagerThread.run(); + } + //Ensure the state is cleaned up. + Assert.assertEquals(this.dags.size(), 0); + Assert.assertEquals(this.jobToDag.size(), 0); + Assert.assertEquals(this.dagToJobs.size(), 0); + Assert.assertEquals(this._dagStateStore.getDags().size(), 0); + } + } } @AfterClass http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c103a8f6/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java deleted file mode 100644 index e97d860..0000000 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gobblin.service.modules.orchestration; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - -import org.junit.Assert; -import org.testng.annotations.Test; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigValueFactory; - -import org.apache.gobblin.config.ConfigBuilder; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.runtime.api.JobSpec; -import org.apache.gobblin.runtime.api.SpecExecutor; -import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; -import org.apache.gobblin.service.ExecutionStatus; -import org.apache.gobblin.service.modules.flowgraph.Dag; -import org.apache.gobblin.service.modules.spec.JobExecutionPlan; -import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; - -import static org.testng.Assert.*; - - -public class DagManagerUtilsTest { - - /** - * Create a {@link Dag <JobExecutionPlan>} with 2 parents and 1 child (i.e. a V-shaped dag). - * @return a Dag. - */ - public Dag<JobExecutionPlan> buildDag() throws URISyntaxException { - List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(); - Config baseConfig = ConfigBuilder.create(). - addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group0"). - addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow0"). - addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()). - addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group0").build(); - for (int i = 0; i < 3; i++) { - String suffix = Integer.toString(i); - Config jobConfig = baseConfig.withValue(ConfigurationKeys.JOB_NAME_KEY, ConfigValueFactory.fromAnyRef("job" + suffix)); - if (i == 2) { - jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job0,job1")); - } - JobSpec js = JobSpec.builder("test_job" + suffix).withVersion(suffix).withConfig(jobConfig). - withTemplate(new URI("job" + suffix)).build(); - SpecExecutor specExecutor = InMemorySpecExecutor.createDummySpecExecutor(new URI("job" + i)); - JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor); - jobExecutionPlans.add(jobExecutionPlan); - } - return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans); - } - - @Test - public void testGetNext() throws URISyntaxException { - Dag<JobExecutionPlan> dag = buildDag(); - - Set<Dag.DagNode<JobExecutionPlan>> dagNodeSet = DagManagerUtils.getNext(dag); - Assert.assertEquals(dagNodeSet.size(), 2); - - //Set 1st job to complete and 2nd job running state - JobExecutionPlan jobExecutionPlan1 = dag.getNodes().get(0).getValue(); - jobExecutionPlan1.setExecutionStatus(ExecutionStatus.COMPLETE); - JobExecutionPlan jobExecutionPlan2 = dag.getNodes().get(1).getValue(); - jobExecutionPlan2.setExecutionStatus(ExecutionStatus.RUNNING); - - //No new job to run - dagNodeSet = DagManagerUtils.getNext(dag); - Assert.assertEquals(dagNodeSet.size(), 0); - - //Set 2nd job to complete; we must have 3rd job to run next - jobExecutionPlan2.setExecutionStatus(ExecutionStatus.COMPLETE); - dagNodeSet = DagManagerUtils.getNext(dag); - Assert.assertEquals(dagNodeSet.size(), 1); - - //Set the 3rd job to running state, no new jobs to run - JobExecutionPlan jobExecutionPlan3 = dag.getNodes().get(2).getValue(); - jobExecutionPlan3.setExecutionStatus(ExecutionStatus.RUNNING); - dagNodeSet = DagManagerUtils.getNext(dag); - Assert.assertEquals(dagNodeSet.size(), 0); - - //Set the 3rd job to complete; no new jobs to run - jobExecutionPlan3.setExecutionStatus(ExecutionStatus.COMPLETE); - dagNodeSet = DagManagerUtils.getNext(dag); - Assert.assertEquals(dagNodeSet.size(), 0); - - - dag = buildDag(); - dagNodeSet = DagManagerUtils.getNext(dag); - Assert.assertEquals(dagNodeSet.size(), 2); - //Set 1st job to failed; no new jobs to run - jobExecutionPlan1 = dag.getNodes().get(0).getValue(); - jobExecutionPlan1.setExecutionStatus(ExecutionStatus.FAILED); - dagNodeSet = DagManagerUtils.getNext(dag); - Assert.assertEquals(dagNodeSet.size(), 0); - } - - -} \ No newline at end of file
