[GOBBLIN-590] Implement Workflow Manager in Gobblin-as-a-Service (GaaS). Closes #2456 from sv2000/workflow
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/71184cdb Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/71184cdb Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/71184cdb Branch: refs/heads/master Commit: 71184cdb1bb35ac5874fffd1415ca632c6ee92c0 Parents: 391615e Author: sv2000 <[email protected]> Authored: Tue Oct 2 09:23:34 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Oct 2 09:23:34 2018 -0700 ---------------------------------------------------------------------- .../gobblin/service/ServiceConfigKeys.java | 3 +- .../gobblin/metrics/event/TimingEvent.java | 5 + .../apache/gobblin/service/FlowStatusTest.java | 19 +- .../modules/core/GobblinServiceManager.java | 41 +- .../service/modules/flow/FlowGraphPath.java | 2 - .../gobblin/service/modules/flowgraph/Dag.java | 5 +- .../modules/orchestration/DagManager.java | 440 +++++++++++++++++++ .../modules/orchestration/DagManagerUtils.java | 109 +++++ .../modules/orchestration/DagStateStore.java | 53 +++ .../modules/orchestration/FSDagStateStore.java | 148 +++++++ .../modules/orchestration/Orchestrator.java | 92 ++-- .../scheduler/GobblinServiceJobScheduler.java | 14 +- .../service/modules/spec/JobExecutionPlan.java | 10 +- .../spec/JobExecutionPlanListDeserializer.java | 108 +++++ .../spec/JobExecutionPlanListSerializer.java | 90 ++++ .../modules/spec/SerializationConstants.java | 33 ++ .../core/IdentityFlowToJobSpecCompilerTest.java | 4 +- .../service/modules/flowgraph/DagTest.java | 8 +- .../modules/orchestration/DagManagerTest.java | 276 ++++++++++++ .../orchestration/DagManagerUtilsTest.java | 117 +++++ .../orchestration/FSDagStateStoreTest.java | 154 +++++++ .../modules/orchestration/OrchestratorTest.java | 2 +- 22 files changed, 1660 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java index c9b034c..255b1f7 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java @@ -32,6 +32,7 @@ public class ServiceConfigKeys { public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "restliServer.enabled"; public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled"; public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled"; + public static final String GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "dagManager.enabled"; // Helix / ServiceScheduler Keys public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_SERVICE_PREFIX + "helix.cluster.name"; @@ -113,5 +114,5 @@ public class ServiceConfigKeys { public static final String GOBBLIN_SERVICE_LOG4J_CONFIGURATION_FILE = "log4j-service.properties"; // GAAS Listerning Port public static final String SERVICE_PORT = GOBBLIN_SERVICE_PREFIX + "port"; - + } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java index b00465f..71dd947 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java @@ -58,6 +58,11 @@ public class TimingEvent { public static final String FLOW_COMPILED = "FlowCompiled"; } + public static class JobStatusTimings { + public static final String JOB_STATUS_POLLED = "JobStatusPolled"; + public static final String ALL_JOB_STATUSES_POLLED = "AllJobStatusesPolled"; + } + public static class FlowEventConstants { public static final String FLOW_NAME_FIELD = "flowName"; public static final String FLOW_GROUP_FIELD = "flowGroup"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java index d5e857e..b8592ad 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java @@ -17,6 +17,7 @@ package org.apache.gobblin.service; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -40,7 +41,7 @@ import org.apache.gobblin.restli.EmbeddedRestliServer; import org.apache.gobblin.service.monitoring.FlowStatusGenerator; import org.apache.gobblin.service.monitoring.JobStatusRetriever; -@Test(groups = { "gobblin.service" }, singleThreaded=true) +@Test(groups = { "gobblin.service" }, singleThreaded = true) public class FlowStatusTest { private FlowStatusClient _client; private EmbeddedRestliServer _server; @@ -51,7 +52,21 @@ public class FlowStatusTest { @Override public Iterator<org.apache.gobblin.service.monitoring.JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, long flowExecutionId) { - return _listOfJobStatusLists.get((int)flowExecutionId).iterator(); + return _listOfJobStatusLists.get((int) flowExecutionId).iterator(); + } + + @Override + public Iterator<org.apache.gobblin.service.monitoring.JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, + long flowExecutionId, String jobGroup, String jobName) { + Iterator<org.apache.gobblin.service.monitoring.JobStatus> jobStatusIterator = getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId); + List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = new ArrayList<>(); + while (jobStatusIterator.hasNext()) { + org.apache.gobblin.service.monitoring.JobStatus jobStatus = jobStatusIterator.next(); + if (jobStatus.getJobGroup().equals(jobGroup) && jobStatus.getJobName().equals(jobName)) { + jobStatusList.add(jobStatus); + } + } + return jobStatusList.iterator(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index 8d934ce..243f7e6 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -85,6 +85,7 @@ import org.apache.gobblin.service.FlowConfigsV2Resource; import org.apache.gobblin.service.FlowId; import org.apache.gobblin.service.Schedule; import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.orchestration.DagManager; import org.apache.gobblin.service.modules.orchestration.Orchestrator; import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler; import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler; @@ -116,6 +117,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri protected final boolean isRestLIServerEnabled; protected final boolean isTopologySpecFactoryEnabled; protected final boolean isGitConfigMonitorEnabled; + protected final boolean isDagManagerEnabled; protected TopologyCatalog topologyCatalog; @Getter @@ -138,6 +140,8 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri protected GitConfigMonitor gitConfigMonitor; + protected DagManager dagManager; + @Getter protected Config config; private final MetricContext metricContext; @@ -159,8 +163,8 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri this.serviceLauncher = new ServiceBasedAppLauncher(properties, serviceName); this.fs = buildFileSystem(config); - this.serviceWorkDir = serviceWorkDirOptional.isPresent() ? serviceWorkDirOptional.get() : - getServiceWorkDirPath(this.fs, serviceName, serviceId); + this.serviceWorkDir = serviceWorkDirOptional.isPresent() ? serviceWorkDirOptional.get() + : getServiceWorkDirPath(this.fs, serviceName, serviceId); // Initialize TopologyCatalog this.isTopologyCatalogEnabled = ConfigUtils.getBoolean(config, @@ -182,14 +186,24 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri this.isGitConfigMonitorEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, false); + this.isDagManagerEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, false); + if (this.isGitConfigMonitorEnabled) { this.gitConfigMonitor = new GitConfigMonitor(config, this.flowCatalog); this.serviceLauncher.addService(this.gitConfigMonitor); } + + if (this.isDagManagerEnabled) { + this.dagManager = new DagManager(config); + this.serviceLauncher.addService(this.dagManager); + } } else { this.isGitConfigMonitorEnabled = false; + this.isDagManagerEnabled = false; } + + // Initialize Helix Optional<String> zkConnectionString = Optional.fromNullable(ConfigUtils.getString(config, ServiceConfigKeys.ZK_CONNECTION_STRING_KEY, null)); @@ -206,7 +220,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri this.isSchedulerEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY, true); if (isSchedulerEnabled) { - this.orchestrator = new Orchestrator(config, Optional.of(this.topologyCatalog), Optional.of(LOGGER)); + this.orchestrator = new Orchestrator(config, Optional.of(this.topologyCatalog), Optional.fromNullable(this.dagManager), Optional.of(LOGGER)); SchedulerService schedulerService = new SchedulerService(properties); this.scheduler = new GobblinServiceJobScheduler(this.serviceName, config, this.helixManager, @@ -327,6 +341,10 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri if (this.isGitConfigMonitorEnabled) { this.gitConfigMonitor.setActive(true); } + + if (this.isDagManagerEnabled) { + this.dagManager.setActive(true); + } } else if (this.helixManager.isPresent()) { LOGGER.info("Leader lost notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(), this.helixManager.get().isLeader()); @@ -338,6 +356,10 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri if (this.isGitConfigMonitorEnabled) { this.gitConfigMonitor.setActive(false); } + + if (this.isDagManagerEnabled) { + this.dagManager.setActive(false); + } } } @@ -371,6 +393,11 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri if (this.isGitConfigMonitorEnabled) { this.gitConfigMonitor.setActive(true); } + + if (this.isDagManagerEnabled) { + this.dagManager.setActive(true); + } + } else { if (this.isSchedulerEnabled) { LOGGER.info("[Init] Gobblin Service is running in slave instance mode, not enabling Scheduler."); @@ -385,6 +412,11 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri if (this.isGitConfigMonitorEnabled) { this.gitConfigMonitor.setActive(true); } + + if (this.isDagManagerEnabled) { + this.dagManager.setActive(true); + } + } // Populate TopologyCatalog with all Topologies generated by TopologySpecFactory @@ -479,7 +511,8 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri private ContextAwareHistogram serviceLeadershipChange; public Metrics(final MetricContext metricContext, Config config) { - int timeWindowSizeInMinutes = ConfigUtils.getInt(config, ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES); + int timeWindowSizeInMinutes = ConfigUtils.getInt(config, ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, + ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES); this.serviceLeadershipChange = metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, timeWindowSizeInMinutes, TimeUnit.MINUTES); this.contextAwareMetrics.add(this.serviceLeadershipChange); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java index 1b7ce11..4b81f1f 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java @@ -17,13 +17,11 @@ package org.apache.gobblin.service.modules.flow; -import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import com.google.common.collect.Lists; import com.typesafe.config.Config; import lombok.Getter; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java index 3606897..b41d1d0 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java @@ -18,6 +18,7 @@ package org.apache.gobblin.service.modules.flowgraph; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -79,11 +80,11 @@ public class Dag<T> { } public List<DagNode<T>> getChildren(DagNode node) { - return parentChildMap.getOrDefault(node, null); + return parentChildMap.getOrDefault(node, Collections.EMPTY_LIST); } public List<DagNode<T>> getParents(DagNode node) { - return node.parentNodes; + return (node.parentNodes != null)? node.parentNodes : Collections.EMPTY_LIST; } public boolean isEmpty() { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java new file mode 100644 index 0000000..ac304fb --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AbstractIdleService; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.service.ExecutionStatus; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.monitoring.JobStatus; +import org.apache.gobblin.service.monitoring.JobStatusRetriever; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +/** + * This class implements a manager to manage the life cycle of a {@link Dag}. A {@link Dag} is submitted to the + * {@link DagManager} by the {@link Orchestrator#orchestrate(Spec)} method. On receiving a {@link Dag}, the + * {@link DagManager} first persists the {@link Dag} to the {@link DagStateStore}, and then submits it to a {@link BlockingQueue}. + * This guarantees that each {@link Dag} received by the {@link DagManager} can be recovered in case of a leadership + * change or service restart. + * + * The implementation of the {@link DagManager} is multi-threaded. Each {@link DagManagerThread} polls the + * {@link BlockingQueue} for new Dag submissions at fixed intervals. It deques any newly submitted Dags and coordinates + * the execution of individual jobs in the Dag. The coordination logic involves polling the {@link JobStatus}es of running + * jobs. Upon completion of a job, it will either schedule the next job in the Dag (on SUCCESS) or mark the Dag as failed + * (on FAILURE). Upon completion of a Dag execution, it will perform the required clean up actions. + * + * The {@link DagManager} is active only in the leader mode. To ensure, each {@link Dag} managed by a {@link DagManager} is + * checkpointed to a persistent location. On start up or leadership change, + * the {@link DagManager} loads all the checkpointed {@link Dag}s and adds them to the {@link BlockingQueue}. + * Current implementation supports only FileSystem-based checkpointing of the Dag statuses. + */ +@Alpha +@Slf4j +public class DagManager extends AbstractIdleService { + private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10; + private static final Integer DEFAULT_NUM_THREADS = 3; + private static final Integer TERMINATION_TIMEOUT = 30; + + public static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager."; + public static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + "numThreads"; + public static final String JOB_STATUS_POLLING_INTERVAL_KEY = DAG_MANAGER_PREFIX + "pollingInterval"; + public static final String JOB_STATUS_RETRIEVER_KEY = DAG_MANAGER_PREFIX + "jobStatusRetriever"; + public static final String DAG_STORE_CLASS_KEY = DAG_MANAGER_PREFIX + "dagStateStoreClass"; + public static final String DAG_STATESTORE_DIR = DAG_MANAGER_PREFIX + "dagStateStoreDir"; + + private BlockingQueue<Dag<JobExecutionPlan>> queue; + private ScheduledExecutorService scheduledExecutorPool; + private boolean instrumentationEnabled; + + private final Integer numThreads; + private final Integer pollingInterval; + private final JobStatusRetriever jobStatusRetriever; + private final DagStateStore dagStateStore; + private volatile boolean isActive = false; + + public DagManager(Config config, boolean instrumentationEnabled) { + this.queue = new LinkedBlockingDeque<>(); + this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS); + this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads); + this.pollingInterval = ConfigUtils.getInt(config, JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL); + this.instrumentationEnabled = instrumentationEnabled; + + try { + Class jobStatusRetrieverClass = Class.forName(config.getString(JOB_STATUS_RETRIEVER_KEY)); + this.jobStatusRetriever = + (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass); + Class dagStateStoreClass = Class.forName(config.getString(DAG_STORE_CLASS_KEY)); + this.dagStateStore = (DagStateStore) GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config); + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Exception encountered during DagManager initialization", e); + } + } + + public DagManager(Config config) { + this(config, true); + } + + /** Start the service. On startup, the service launches a fixed pool of {@link DagManagerThread}s, which are scheduled at + * fixed intervals. The service also loads any {@link Dag}s + */ + @Override + protected void startUp() { + //On startup, the service creates tasks that are scheduled at a fixed rate. + for (int i = 0; i < numThreads; i++) { + this.scheduledExecutorPool.scheduleAtFixedRate(new DagManagerThread(jobStatusRetriever, dagStateStore, queue, instrumentationEnabled), 0, this.pollingInterval, + TimeUnit.SECONDS); + } + } + + /** + * Method to submit a {@link Dag} to the {@link DagManager}. The {@link DagManager} first persists the + * submitted dag to the {@link DagStateStore} and then adds the dag to a {@link BlockingQueue} to be picked up + * by one of the {@link DagManagerThread}s. + */ + public synchronized void offer(Dag<JobExecutionPlan> dag) throws IOException { + //Persist the dag + this.dagStateStore.writeCheckpoint(dag); + //Add it to the queue of dags + if (!this.queue.offer(dag)) { + throw new IOException("Could not add dag" + DagManagerUtils.generateDagId(dag) + "to queue"); + } + } + + /** + * When a {@link DagManager} becomes active, it loads the serialized representations of the currently running {@link Dag}s + * from the checkpoint directory, deserializes the {@link Dag}s and adds them to a queue to be consumed by + * the {@link DagManagerThread}s. + * @param active a boolean to indicate if the {@link DagManager} is the leader. + */ + public synchronized void setActive(boolean active) { + this.isActive = active; + try { + if (this.isActive) { + for (Dag<JobExecutionPlan> dag : dagStateStore.getDags()) { + offer(dag); + } + } + } catch (IOException e) { + throw new RuntimeException("Exception encountered when activating the new DagManager", e); + } + } + + /** + * Each {@link DagManagerThread} performs 2 actions when scheduled: + * <ol> + * <li> Dequeues any newly submitted {@link Dag}s from the Dag queue. All the {@link JobExecutionPlan}s which + * are part of the dequed {@link Dag} will be managed this thread. </li> + * <li> Polls the job status store for the current job statuses of all the running jobs it manages.</li> + * </ol> + */ + public static class DagManagerThread implements Runnable { + private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>(); + private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>(); + private final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>(); + private final Set<String> failedDagIds = new HashSet<>(); + private final MetricContext metricContext; + private final Optional<EventSubmitter> eventSubmitter; + + private JobStatusRetriever jobStatusRetriever; + private DagStateStore dagStateStore; + private BlockingQueue<Dag<JobExecutionPlan>> queue; + + /** + * Constructor. + */ + public DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, BlockingQueue<Dag<JobExecutionPlan>> queue, + boolean instrumentationEnabled) { + this.jobStatusRetriever = jobStatusRetriever; + this.dagStateStore = dagStateStore; + this.queue = queue; + if (instrumentationEnabled) { + this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass()); + this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build()); + } else { + this.metricContext = null; + this.eventSubmitter = Optional.absent(); + } + } + + /** + * Main body of the {@link DagManagerThread}. + */ + @Override + public void run() { + try { + Object nextItem = queue.poll(); + //Poll the queue for a new Dag to execute. + if (nextItem != null) { + Dag<JobExecutionPlan> dag = (Dag<JobExecutionPlan>) nextItem; + + //Initialize dag. + initialize(dag); + } + log.info("Polling job statuses.."); + TimingEvent jobStatusPollTimer = this.eventSubmitter.isPresent() + ? eventSubmitter.get().getTimingEvent(TimingEvent.JobStatusTimings.ALL_JOB_STATUSES_POLLED) + : null; + //Poll and update the job statuses of running jobs. + pollJobStatuses(); + log.info("Poll done."); + if (jobStatusPollTimer != null) { + jobStatusPollTimer.stop(); + } + //Clean up any finished dags + log.info("Cleaning up finished dags.."); + cleanUp(); + log.info("Clean up done"); + } catch (Exception e) { + log.error("Exception encountered in {}", getClass().getName(), e); + } + } + + /** + * This method determines the next set of jobs to execute from the dag and submits them for execution. + * This method updates internal data structures tracking currently running Dags and jobs. + */ + private void initialize(Dag<JobExecutionPlan> dag) + throws IOException { + //Add Dag to the map of running dags + String dagId = DagManagerUtils.generateDagId(dag); + log.info("Initializing Dag {}", dagId); + if (this.dags.containsKey(dagId)) { + log.warn("Already tracking a dag with dagId {}, skipping.", dagId); + return; + } + + this.dags.put(dagId, dag); + log.info("Dag {} - determining if any jobs are already running."); + //Are there any jobs already in the running state? This check is for Dags already running + //before a leadership change occurs. + for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) { + if (DagManagerUtils.getExecutionStatus(dagNode) == ExecutionStatus.RUNNING) { + addJobState(dagId, dagNode); + } + } + log.info("Dag {} submitting jobs ready for execution."); + //Determine the next set of jobs to run and submit them for execution + submitNext(dagId); + log.info("Dag {} Initialization complete."); + } + + /** + * Poll the statuses of running jobs. + * @return List of {@link JobStatus}es. + */ + private void pollJobStatuses() + throws IOException { + this.failedDagIds.clear(); + for (Dag.DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) { + TimingEvent jobStatusPollTimer = this.eventSubmitter.isPresent() + ? eventSubmitter.get().getTimingEvent(TimingEvent.JobStatusTimings.JOB_STATUS_POLLED) + : null; + JobStatus jobStatus = pollJobStatus(node); + if (jobStatusPollTimer != null) { + jobStatusPollTimer.stop(); + } + Preconditions.checkNotNull(jobStatus, "Received null job status for a running job " + DagManagerUtils.getJobName(node)); + JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(node); + //TODO: This will be updated when JobStatus schema provides the correct execution status. + //Currently, it is a placeholder. + switch (jobStatus.getEventName()) { + case TimingEvent.LauncherTimings.JOB_COMPLETE: + jobExecutionPlan.setExecutionStatus(ExecutionStatus.COMPLETE); + onJobFinish(node); + break; + case TimingEvent.LauncherTimings.JOB_FAILED: + jobExecutionPlan.setExecutionStatus(ExecutionStatus.FAILED); + onJobFinish(node); + break; + default: + jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING); + break; + } + } + } + + /** + * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}. + */ + private JobStatus pollJobStatus(Dag.DagNode<JobExecutionPlan> dagNode) { + Config jobConfig = dagNode.getValue().getJobSpec().getConfig(); + String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); + String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); + long flowExecutionId = jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY); + String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY); + String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); + + Iterator<JobStatus> jobStatusIterator = + this.jobStatusRetriever.getJobStatusesForFlowExecution(flowGroup, flowName, flowExecutionId, jobGroup, jobName); + if (jobStatusIterator.hasNext()) { + return jobStatusIterator.next(); + } else { + return null; + } + } + + public void submitNext(String dagId) + throws IOException { + Dag<JobExecutionPlan> dag = this.dags.get(dagId); + //Submit jobs from the dag ready for execution. + for (Dag.DagNode<JobExecutionPlan> dagNode : DagManagerUtils.getNext(dag)) { + submitJob(dagNode); + addJobState(dagId, dagNode); + } + //Checkpoint the dag state + this.dagStateStore.writeCheckpoint(dag); + } + + /** + * Submits a {@link JobSpec} to a {@link org.apache.gobblin.runtime.api.SpecExecutor}. + */ + private void submitJob(Dag.DagNode<JobExecutionPlan> dagNode) { + JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode); + jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING); + JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode); + + // Run this spec on selected executor + SpecProducer producer = null; + try { + producer = DagManagerUtils.getSpecProducer(dagNode); + Config jobConfig = DagManagerUtils.getJobConfig(dagNode); + if (!jobConfig.hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { + log.warn("JobSpec does not contain flowExecutionId."); + } + log.info("Submitting job: {} on executor: {}", jobSpec, producer); + producer.addSpec(jobSpec); + } catch (Exception e) { + log.error("Cannot submit job: {} on executor: {}", jobSpec, producer, e); + } + } + + /** + * Method that defines the actions to be performed when a job finishes either successfully or with failure. + * This method updates the state of the dag and performs clean up actions as necessary. + */ + private void onJobFinish(Dag.DagNode<JobExecutionPlan> dagNode) + throws IOException { + Dag<JobExecutionPlan> dag = this.jobToDag.get(dagNode); + String dagId = DagManagerUtils.generateDagId(dag); + String jobName = DagManagerUtils.getJobName(dagNode); + ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode); + log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, jobStatus.name()); + + deleteJobState(dagId, dagNode); + + if (jobStatus == ExecutionStatus.COMPLETE) { + submitNext(dagId); + } else if (jobStatus == ExecutionStatus.FAILED) { + this.failedDagIds.add(dagId); + } + } + + private void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode) { + this.jobToDag.remove(dagNode); + this.dagToJobs.get(dagId).remove(dagNode); + } + + private void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode) { + Dag<JobExecutionPlan> dag = this.dags.get(dagId); + this.jobToDag.put(dagNode, dag); + if (this.dagToJobs.containsKey(dagId)) { + this.dagToJobs.get(dagId).add(dagNode); + } else { + LinkedList<Dag.DagNode<JobExecutionPlan>> dagNodeList = Lists.newLinkedList(); + dagNodeList.add(dagNode); + this.dagToJobs.put(dagId, dagNodeList); + } + } + + private boolean hasRunningJobs(String dagId) { + return !this.dagToJobs.get(dagId).isEmpty(); + } + + /** + * Perform clean up. Remove a dag from the dagstore if the dag is complete and update internal state. + */ + private void cleanUp() { + //Clean up failed dags + for (String dagId : this.failedDagIds) { + //Skip monitoring of any other jobs of the failed dag. + LinkedList<Dag.DagNode<JobExecutionPlan>> dagNodeList = this.dagToJobs.get(dagId); + while (!dagNodeList.isEmpty()) { + Dag.DagNode<JobExecutionPlan> dagNode = dagNodeList.poll(); + deleteJobState(dagId, dagNode); + } + log.info("Dag {} has finished with status FAILED; Cleaning up dag from the state store.", dagId); + cleanUpDag(dagId); + } + + //Clean up successfully completed dags + for (String dagId : this.dags.keySet()) { + if (!hasRunningJobs(dagId)) { + log.info("Dag {} has finished with status COMPLETE; Cleaning up dag from the state store.", dagId); + cleanUpDag(dagId); + } + } + } + + private void cleanUpDag(String dagId) { + Dag<JobExecutionPlan> dag = this.dags.get(dagId); + this.dagToJobs.remove(dagId); + this.dags.remove(dagId); + this.dagStateStore.cleanUp(dag); + } + } + + /** Stop the service. */ + @Override + protected void shutDown() + throws Exception { + this.scheduledExecutorPool.shutdown(); + this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java new file mode 100644 index 0000000..348aa78 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.service.ExecutionStatus; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + +public class DagManagerUtils { + /** + * Generate a dagId from the given {@link Dag} instance. + * @param dag instance of a {@link Dag}. + * @return a String id associated corresponding to the {@link Dag} instance. + */ + public static String generateDagId(Dag<JobExecutionPlan> dag) { + Config jobConfig = dag.getStartNodes().get(0).getValue().getJobSpec().getConfig(); + String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); + String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); + Long flowExecutionId = jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY); + return Joiner.on("_").join(flowGroup, flowName, flowExecutionId); + } + + public static String getJobName(Dag.DagNode<JobExecutionPlan> dagNode) { + return dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY); + } + + public static JobExecutionPlan getJobExecutionPlan(Dag.DagNode<JobExecutionPlan> dagNode) { + return dagNode.getValue(); + } + + public static JobSpec getJobSpec(Dag.DagNode<JobExecutionPlan> dagNode) { + return dagNode.getValue().getJobSpec(); + } + + public static Config getJobConfig(Dag.DagNode<JobExecutionPlan> dagNode) { + return dagNode.getValue().getJobSpec().getConfig(); + } + + public static SpecProducer getSpecProducer(Dag.DagNode<JobExecutionPlan> dagNode) + throws ExecutionException, InterruptedException { + return dagNode.getValue().getSpecExecutor().getProducer().get(); + } + + public static ExecutionStatus getExecutionStatus(Dag.DagNode<JobExecutionPlan> dagNode) { + return dagNode.getValue().getExecutionStatus(); + } + + /** + * Traverse the dag to determine the next set of nodes to be executed. It starts with the startNodes of the dag and + * identifies each node yet to be executed and for which each of its parent nodes is in the {@link ExecutionStatus#COMPLETE} + * state. + */ + public static Set<Dag.DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> dag) { + Set<Dag.DagNode<JobExecutionPlan>> nextNodesToExecute = new HashSet<>(); + LinkedList<Dag.DagNode<JobExecutionPlan>> nodesToExpand = Lists.newLinkedList(dag.getStartNodes()); + + while (!nodesToExpand.isEmpty()) { + Dag.DagNode<JobExecutionPlan> node = nodesToExpand.poll(); + boolean addFlag = true; + if (getExecutionStatus(node) == ExecutionStatus.$UNKNOWN) { + //Add a node to be executed next, only if all of its parent nodes are COMPLETE. + List<Dag.DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node); + for (Dag.DagNode<JobExecutionPlan> parentNode : parentNodes) { + if (getExecutionStatus(parentNode) != ExecutionStatus.COMPLETE) { + addFlag = false; + break; + } + } + if (addFlag) { + nextNodesToExecute.add(node); + } + } else if (getExecutionStatus(node) == ExecutionStatus.COMPLETE) { + //Explore the children of COMPLETED node as next candidates for execution. + nodesToExpand.addAll(dag.getChildren(node)); + } else { + return new HashSet<>(); + } + } + return nextNodesToExecute; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java new file mode 100644 index 0000000..0984a91 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.util.List; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/** + * An interface for storing and retrieving currently running {@link Dag<JobExecutionPlan>}s. In case of a leadership + * change in the {@link org.apache.gobblin.service.modules.core.GobblinServiceManager}, the corresponding {@link DagManager} + * loads the running {@link Dag}s from the {@link DagStateStore} to resume their execution. + */ +@Alpha +public interface DagStateStore { + /** + * Persist the {@link Dag} to the backing store. + * @param dag + */ + public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException; + + /** + * Delete the {@link Dag} from the backing store, typically upon completion of execution. + * @param dag + */ + public void cleanUp(Dag<JobExecutionPlan> dag); + + /** + * Load all currently running {@link Dag}s from the underlying store. Typically, invoked when a new {@link DagManager} + * takes over or on restart of service. + * @return a {@link List} of currently running {@link Dag}s. + */ + public List<Dag<JobExecutionPlan>> getDags() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java new file mode 100644 index 0000000..0720210 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.List; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonSerializer; +import com.google.gson.reflect.TypeToken; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer; + + +@Alpha +@Slf4j +public class FSDagStateStore implements DagStateStore { + public static final String DAG_FILE_EXTENSION = ".dag"; + /** Use gson for ser/de */ + //private static final Gson GSON = GsonInterfaceAdapter.getGson(Object.class); + + /** Type token for ser/de JobExecutionPlan list */ + private static final Type LIST_JOBEXECUTIONPLAN_TYPE = new TypeToken<List<JobExecutionPlan>>(){}.getType(); + + private final String dagCheckpointDir; + private final Gson gson; + + public FSDagStateStore(Config config) { + this.dagCheckpointDir = config.getString(DagManager.DAG_STATESTORE_DIR); + JsonSerializer<List<JobExecutionPlan>> serializer = new JobExecutionPlanListSerializer(); + JsonDeserializer<List<JobExecutionPlan>> deserializer = new JobExecutionPlanListDeserializer(); + this.gson = new GsonBuilder().registerTypeAdapter(LIST_JOBEXECUTIONPLAN_TYPE, serializer) + .registerTypeAdapter(LIST_JOBEXECUTIONPLAN_TYPE, deserializer).create(); + } + + /** + * {@inheritDoc} + */ + @Override + public synchronized void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException { + // write to a temporary name then rename to make the operation atomic when the file system allows a file to be + // replaced + String fileName = DagManagerUtils.generateDagId(dag) + DAG_FILE_EXTENSION; + String serializedDag = serializeDag(dag); + + File checkpointDir = new File(this.dagCheckpointDir); + if (!checkpointDir.exists()) { + if (!checkpointDir.mkdirs()) { + throw new IOException("Could not create dir - " + this.dagCheckpointDir); + } + } + + File tmpCheckpointFile = new File(this.dagCheckpointDir, fileName + ".tmp"); + File checkpointFile = new File(this.dagCheckpointDir, fileName); + + Files.write(serializedDag, tmpCheckpointFile, Charsets.UTF_8); + Files.move(tmpCheckpointFile, checkpointFile); + } + + /** + * {@inheritDoc} + */ + @Override + public synchronized void cleanUp(Dag<JobExecutionPlan> dag) { + String fileName = DagManagerUtils.generateDagId(dag) + DAG_FILE_EXTENSION; + + //Delete the dag checkpoint file from the checkpoint directory + File checkpointFile = new File(this.dagCheckpointDir, fileName); + if (!checkpointFile.delete()) { + log.error("Could not delete checkpoint file: {}", checkpointFile.getName()); + } + } + + /** + * {@inheritDoc} + */ + @Override + public List<Dag<JobExecutionPlan>> getDags() throws IOException { + List<Dag<JobExecutionPlan>> runningDags = Lists.newArrayList(); + File dagCheckpointFolder = new File(this.dagCheckpointDir); + for (File file : dagCheckpointFolder.listFiles((dir, name) -> name.endsWith(DAG_FILE_EXTENSION))) { + runningDags.add(getDag(file)); + } + return runningDags; + } + + /** + * Return a {@link Dag} given a file name. + * @param dagFile + * @return the {@link Dag} associated with the dagFiel. + */ + @VisibleForTesting + public Dag<JobExecutionPlan> getDag(File dagFile) throws IOException { + String serializedDag = Files.toString(dagFile, Charsets.UTF_8); + return deserializeDag(serializedDag); + } + + /** + * Serialize a {@link Dag<JobExecutionPlan>}. + * @param dag A Dag parametrized by type {@link JobExecutionPlan}. + * @return a JSON string representation of the Dag object. + */ + private String serializeDag(Dag<JobExecutionPlan> dag) { + List<JobExecutionPlan> jobExecutionPlanList = dag.getNodes().stream().map(Dag.DagNode::getValue).collect(Collectors.toList()); + return gson.toJson(jobExecutionPlanList, LIST_JOBEXECUTIONPLAN_TYPE); + } + + /** + * De-serialize a Dag. + * @param jsonDag A string representation of a Dag. + * @return a {@link Dag} parametrized by {@link JobExecutionPlan}. + */ + private Dag<JobExecutionPlan> deserializeDag(String jsonDag) { + return new JobExecutionPlanDagFactory().createDag(gson.fromJson(jsonDag, LIST_JOBEXECUTIONPLAN_TYPE)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 1f7f737..ea0d0bd 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -75,6 +75,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { protected final Logger _log; protected final SpecCompiler specCompiler; protected final Optional<TopologyCatalog> topologyCatalog; + protected final Optional<DagManager> dagManager; protected final MetricContext metricContext; protected final Optional<EventSubmitter> eventSubmitter; @@ -87,7 +88,8 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { private final ClassAliasResolver<SpecCompiler> aliasResolver; - public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<Logger> log, boolean instrumentationEnabled) { + public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log, + boolean instrumentationEnabled) { _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); if (instrumentationEnabled) { this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class); @@ -95,8 +97,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { this.flowOrchestrationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER)); this.flowOrchestrationTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER)); this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build()); - } - else { + } else { this.metricContext = null; this.flowOrchestrationSuccessFulMeter = Optional.absent(); this.flowOrchestrationFailedMeter = Optional.absent(); @@ -106,6 +107,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class); this.topologyCatalog = topologyCatalog; + this.dagManager = dagManager; try { String specCompilerClassName = ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS; if (config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) { @@ -121,25 +123,25 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { } } - public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<Logger> log) { - this(config, topologyCatalog, log, true); + public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log) { + this(config, topologyCatalog, dagManager, log, true); } - public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Logger log) { - this(config, topologyCatalog, Optional.of(log)); + public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Logger log) { + this(config, topologyCatalog, dagManager, Optional.of(log)); } public Orchestrator(Config config, Logger log) { - this(config, Optional.<TopologyCatalog>absent(), Optional.of(log)); + this(config, Optional.<TopologyCatalog>absent(), Optional.<DagManager>absent(), Optional.of(log)); } /** Constructor with no logging */ public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog) { - this(config, topologyCatalog, Optional.<Logger>absent()); + this(config, topologyCatalog, Optional.<DagManager>absent(), Optional.<Logger>absent()); } public Orchestrator(Config config) { - this(config, Optional.<TopologyCatalog>absent(), Optional.<Logger>absent()); + this(config, Optional.<TopologyCatalog>absent(), Optional.<DagManager>absent(), Optional.<Logger>absent()); } @VisibleForTesting @@ -216,35 +218,39 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { flowCompilationTimer.stop(flowMetadata); } - // Schedule all compiled JobSpecs on their respective Executor - for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) { - JobExecutionPlan jobExecutionPlan = dagNode.getValue(); - - // Run this spec on selected executor - SpecProducer producer = null; - try { - producer = jobExecutionPlan.getSpecExecutor().getProducer().get(); - Spec jobSpec = jobExecutionPlan.getJobSpec(); - - if (!((JobSpec)jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { - _log.warn("JobSpec does not contain flowExecutionId."); - } - - Map<String, String> jobMetadata = getJobMetadata(flowMetadata, jobExecutionPlan); - _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, producer)); - - TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() - ? this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) - : null; - - producer.addSpec(jobSpec); - - if (jobOrchestrationTimer != null) { - jobOrchestrationTimer.stop(jobMetadata); + if (this.dagManager.isPresent()) { + //Send the dag to the DagManager. + this.dagManager.get().offer(jobExecutionPlanDag); + } else { + // Schedule all compiled JobSpecs on their respective Executor + for (Dag.DagNode<JobExecutionPlan> dagNode : jobExecutionPlanDag.getNodes()) { + JobExecutionPlan jobExecutionPlan = dagNode.getValue(); + + // Run this spec on selected executor + SpecProducer producer = null; + try { + producer = jobExecutionPlan.getSpecExecutor().getProducer().get(); + Spec jobSpec = jobExecutionPlan.getJobSpec(); + + if (!((JobSpec) jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { + _log.warn("JobSpec does not contain flowExecutionId."); + } + + Map<String, String> jobMetadata = getJobMetadata(flowMetadata, jobExecutionPlan); + _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, producer)); + + TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get(). + getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null; + + producer.addSpec(jobSpec); + + if (jobOrchestrationTimer != null) { + jobOrchestrationTimer.stop(jobMetadata); + } + } catch (Exception e) { + _log.error("Cannot successfully setup spec: " + jobExecutionPlan.getJobSpec() + " on executor: " + producer + + " for flow: " + spec, e); } - } catch(Exception e) { - _log.error("Cannot successfully setup spec: " + jobExecutionPlan.getJobSpec() + " on executor: " + producer + - " for flow: " + spec, e); } } } else { @@ -255,7 +261,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); } - private Map<String,String> getFlowMetadata(FlowSpec flowSpec) { + private Map<String, String> getFlowMetadata(FlowSpec flowSpec) { Map<String, String> metadata = Maps.newHashMap(); metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY)); @@ -267,7 +273,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { return metadata; } - private Map<String,String> getJobMetadata(Map<String,String> flowMetadata, JobExecutionPlan jobExecutionPlan) { + private Map<String, String> getJobMetadata(Map<String, String> flowMetadata, JobExecutionPlan jobExecutionPlan) { Map<String, String> jobMetadata = Maps.newHashMap(); JobSpec jobSpec = jobExecutionPlan.getJobSpec(); SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor(); @@ -306,9 +312,9 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { _log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer)); producer.deleteSpec(jobSpec.getUri(), headers); - } catch(Exception e) { - _log.error("Cannot successfully delete spec: " + jobExecutionPlan.getJobSpec() + " on executor: " + producer + - " for flow: " + spec, e); + } catch (Exception e) { + _log.error("Cannot successfully delete spec: " + jobExecutionPlan.getJobSpec() + " on executor: " + producer + + " for flow: " + spec, e); } } } else { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index 1ec791a..ee4ecb7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -21,8 +21,6 @@ import java.net.URI; import java.util.Collection; import java.util.Map; import java.util.Properties; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; import org.apache.commons.lang.StringUtils; import org.apache.helix.HelixManager; @@ -57,6 +55,7 @@ import org.apache.gobblin.scheduler.BaseGobblinJob; import org.apache.gobblin.scheduler.JobScheduler; import org.apache.gobblin.scheduler.SchedulerService; import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.orchestration.DagManager; import org.apache.gobblin.service.modules.orchestration.Orchestrator; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PropertiesUtils; @@ -95,10 +94,10 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata } public GobblinServiceJobScheduler(String serviceName, Config config, Optional<HelixManager> helixManager, - Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, SchedulerService schedulerService, - Optional<Logger> log) + Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, + SchedulerService schedulerService, Optional<Logger> log) throws Exception { - this(serviceName, config, helixManager, flowCatalog, topologyCatalog, new Orchestrator(config, topologyCatalog, log), + this(serviceName, config, helixManager, flowCatalog, topologyCatalog, new Orchestrator(config, topologyCatalog, dagManager, log), schedulerService, log); } @@ -124,9 +123,8 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata } } } - } - // Since we are going to change status to isActive=false, unschedule all flows - else { + } else { + // Since we are going to change status to isActive=false, unschedule all flows for (Spec spec : this.scheduledFlowSpecs.values()) { onDeleteSpec(spec.getUri(), spec.getVersion()); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java index c0c9297..438bed0 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java @@ -26,13 +26,14 @@ import com.google.common.base.Joiner; import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; -import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.service.ExecutionStatus; import org.apache.gobblin.util.ConfigUtils; @@ -41,10 +42,11 @@ import org.apache.gobblin.util.ConfigUtils; * where the {@link JobSpec} will be executed. */ @Data -@AllArgsConstructor +@EqualsAndHashCode (exclude = {"executionStatus"}) public class JobExecutionPlan { - private JobSpec jobSpec; - private SpecExecutor specExecutor; + private final JobSpec jobSpec; + private final SpecExecutor specExecutor; + private ExecutionStatus executionStatus = ExecutionStatus.$UNKNOWN; public static class Factory { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java new file mode 100644 index 0000000..0ab13ba --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.spec; + +import java.lang.reflect.Type; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + +import com.google.gson.JsonArray; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.service.ExecutionStatus; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +@Slf4j +public class JobExecutionPlanListDeserializer implements JsonDeserializer<List<JobExecutionPlan>> { + /** + * Gson invokes this call-back method during deserialization when it encounters a field of the + * specified type. + * <p>In the implementation of this call-back method, you should consider invoking + * {@link JsonDeserializationContext#deserialize(JsonElement, Type)} method to create objects + * for any non-trivial field of the returned object. However, you should never invoke it on the + * the same type passing {@code json} since that will cause an infinite loop (Gson will call your + * call-back method again). + * + * @param json The Json data being deserialized + * @param typeOfT The type of the Object to deserialize to + * @param context + * @return a deserialized object of the specified type typeOfT which is a subclass of {@code T} + * @throws JsonParseException if json is not in the expected format of {@code typeofT} + */ + @Override + public List<JobExecutionPlan> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) + throws JsonParseException { + List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(); + JsonArray jsonArray = json.getAsJsonArray(); + + for (JsonElement jsonElement: jsonArray) { + JsonObject serializedJobExecutionPlan = (JsonObject) jsonElement; + JsonObject jobSpecJson = (JsonObject) serializedJobExecutionPlan.get(SerializationConstants.JOB_SPEC_KEY); + JsonObject specExecutorJson = (JsonObject) serializedJobExecutionPlan.get(SerializationConstants.SPEC_EXECUTOR_KEY); + ExecutionStatus executionStatus = ExecutionStatus.valueOf(serializedJobExecutionPlan. + get(SerializationConstants.EXECUTION_STATUS_KEY).getAsString()); + + String uri = jobSpecJson.get(SerializationConstants.JOB_SPEC_URI_KEY).getAsString(); + String version = jobSpecJson.get(SerializationConstants.JOB_SPEC_VERSION_KEY).getAsString(); + String description = jobSpecJson.get(SerializationConstants.JOB_SPEC_DESCRIPTION_KEY).getAsString(); + String templateURI = jobSpecJson.get(SerializationConstants.JOB_SPEC_TEMPLATE_URI_KEY).getAsString(); + String config = jobSpecJson.get(SerializationConstants.JOB_SPEC_CONFIG_KEY).getAsString(); + Config jobConfig = ConfigFactory.parseString(config); + JobSpec jobSpec; + try { + JobSpec.Builder builder = (uri == null) ? JobSpec.builder() : JobSpec.builder(uri); + builder = (templateURI == null) ? builder : builder.withTemplate(new URI(templateURI)); + builder = (version == null) ? builder : builder.withVersion(version); + builder = (description == null) ? builder : builder.withDescription(description); + jobSpec = builder.withConfig(jobConfig).build(); + } catch (URISyntaxException e) { + log.error("Error deserializing JobSpec {}", config); + throw new RuntimeException(e); + } + + Config specExecutorConfig = ConfigFactory.parseString(specExecutorJson.get(SerializationConstants.SPEC_EXECUTOR_CONFIG_KEY).getAsString()); + String className = specExecutorJson.get(SerializationConstants.SPEC_EXECUTOR_CLASS_KEY).getAsString(); + SpecExecutor specExecutor; + try { + specExecutor = + (SpecExecutor) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(className), specExecutorConfig); + } catch (ReflectiveOperationException e) { + log.error("Error deserializing specExecuor {}", specExecutorConfig); + throw new RuntimeException(e); + } + + JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(jobSpec, specExecutor); + jobExecutionPlan.setExecutionStatus(executionStatus); + jobExecutionPlans.add(jobExecutionPlan); + } + return jobExecutionPlans; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java new file mode 100644 index 0000000..3de0209 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.spec; + +import java.lang.reflect.Type; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigRenderOptions; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.runtime.api.JobSpec; + +@Slf4j +public class JobExecutionPlanListSerializer implements JsonSerializer<List<JobExecutionPlan>> { + /** + * Gson invokes this call-back method during serialization when it encounters a field of the + * specified type. + * + * <p>In the implementation of this call-back method, you should consider invoking + * {@link JsonSerializationContext#serialize(Object, Type)} method to create JsonElements for any + * non-trivial field of the {@code src} object. However, you should never invoke it on the + * {@code src} object itself since that will cause an infinite loop (Gson will call your + * call-back method again).</p> + * + * @param src the object that needs to be converted to Json. + * @param typeOfSrc the actual type (fully genericized version) of the source object. + * @param context + * @return a JsonElement corresponding to the specified object. + */ + @Override + public JsonElement serialize(List<JobExecutionPlan> src, Type typeOfSrc, JsonSerializationContext context) { + JsonArray jsonArray = new JsonArray(); + + for (JobExecutionPlan jobExecutionPlan: src) { + JsonObject jobExecutionPlanJson = new JsonObject(); + JsonObject jobSpecJson = new JsonObject(); + JobSpec jobSpec = jobExecutionPlan.getJobSpec(); + String uri = (jobSpec.getUri() != null) ? jobSpec.getUri().toString() : null; + jobSpecJson.addProperty(SerializationConstants.JOB_SPEC_URI_KEY, uri); + jobSpecJson.addProperty(SerializationConstants.JOB_SPEC_VERSION_KEY, jobSpec.getVersion()); + jobSpecJson.addProperty(SerializationConstants.JOB_SPEC_DESCRIPTION_KEY, jobSpec.getDescription()); + String jobSpecTemplateURI = (jobSpec.getTemplateURI().isPresent()) ? jobSpec.getTemplateURI().get().toString() : null; + jobSpecJson.addProperty(SerializationConstants.JOB_SPEC_TEMPLATE_URI_KEY, jobSpecTemplateURI); + jobSpecJson.addProperty(SerializationConstants.JOB_SPEC_CONFIG_KEY, jobSpec.getConfig().root().render(ConfigRenderOptions.concise())); + jobExecutionPlanJson.add(SerializationConstants.JOB_SPEC_KEY, jobSpecJson); + Config specExecutorConfig; + try { + specExecutorConfig = jobExecutionPlan.getSpecExecutor().getConfig().get(); + } catch (InterruptedException | ExecutionException e) { + log.error("Error serializing JobExecutionPlan {}", jobExecutionPlan.toString()); + throw new RuntimeException(e); + } + JsonObject specExecutorJson = new JsonObject(); + specExecutorJson.addProperty(SerializationConstants.SPEC_EXECUTOR_CONFIG_KEY, + specExecutorConfig.root().render(ConfigRenderOptions.concise())); + specExecutorJson.addProperty(SerializationConstants.SPEC_EXECUTOR_CLASS_KEY, + jobExecutionPlan.getSpecExecutor().getClass().getName()); + jobExecutionPlanJson.add(SerializationConstants.SPEC_EXECUTOR_KEY, specExecutorJson); + + String executionStatus = jobExecutionPlan.getExecutionStatus().name(); + jobExecutionPlanJson.addProperty(SerializationConstants.EXECUTION_STATUS_KEY, executionStatus); + + jsonArray.add(jobExecutionPlanJson); + } + return jsonArray; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java new file mode 100644 index 0000000..ecffad9 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.spec; + +public class SerializationConstants { + public static final String JOB_SPEC_URI_KEY= "uri"; + public static final String JOB_SPEC_VERSION_KEY = "version"; + public static final String JOB_SPEC_DESCRIPTION_KEY = "description"; + public static final String JOB_SPEC_TEMPLATE_URI_KEY = "templateURI"; + public static final String JOB_SPEC_CONFIG_KEY = "config"; + public static final String JOB_SPEC_KEY = "jobSpec"; + + public static final String SPEC_EXECUTOR_CONFIG_KEY = "config"; + public static final String SPEC_EXECUTOR_CLASS_KEY = "class"; + public static final String SPEC_EXECUTOR_KEY = "specExecutor"; + + public static final String EXECUTION_STATUS_KEY = "executionStatus"; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java index c1134a8..d4f2bc2 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java @@ -211,7 +211,7 @@ public class IdentityFlowToJobSpecCompilerTest { Assert.assertTrue(jobSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)); //Assert the start node has no children. - Assert.assertNull(jobExecutionPlanDag.getChildren(dagNode)); + Assert.assertEquals(jobExecutionPlanDag.getChildren(dagNode).size(), 0); } @Test @@ -245,7 +245,7 @@ public class IdentityFlowToJobSpecCompilerTest { Assert.assertTrue(jobSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)); //Assert the start node has no children. - Assert.assertNull(jobExecutionPlanDag.getChildren(dagNode)); + Assert.assertEquals(jobExecutionPlanDag.getChildren(dagNode).size(), 0); } @Test http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/71184cdb/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java index 688f5de..48922d0 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java @@ -77,8 +77,8 @@ public class DagTest { Assert.assertTrue(childSet.contains("val5")); //Ensure end nodes have no children - Assert.assertNull(dag.getChildren(dagNode4)); - Assert.assertNull(dag.getChildren(dagNode5)); + Assert.assertEquals(dag.getChildren(dagNode4).size(), 0); + Assert.assertEquals(dag.getChildren(dagNode5).size(), 0); } @Test @@ -172,7 +172,7 @@ public class DagTest { Assert.assertEquals(dagNew.getStartNodes().size(), 3); for (Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode1, dagNode6, dagNode7)) { Assert.assertTrue(dagNew.getStartNodes().contains(dagNode)); - Assert.assertNull(dagNew.getParents(dagNode)); + Assert.assertEquals(dagNew.getParents(dagNode).size(), 0); if (dagNode == dagNode1) { List<Dag.DagNode<String>> nextNodes = dagNew.getChildren(dagNode); Assert.assertEquals(nextNodes.size(), 2); @@ -188,7 +188,7 @@ public class DagTest { Assert.assertEquals(dagNew.getEndNodes().size(), 3); for (Dag.DagNode<String> dagNode: Lists.newArrayList(dagNode4, dagNode5, dagNode8)) { Assert.assertTrue(dagNew.getEndNodes().contains(dagNode)); - Assert.assertNull(dagNew.getChildren(dagNode)); + Assert.assertEquals(dagNew.getChildren(dagNode).size(), 0); if (dagNode == dagNode8) { Assert.assertEquals(dagNew.getParents(dagNode).size(), 2); Assert.assertTrue(dagNew.getParents(dagNode).contains(dagNode6));
