This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e9326b  [GOBBLIN-1342] Add API to resume a flow
4e9326b is described below

commit 4e9326bd96ab9b5ec05dd1cd5e8010a7d6175f0c
Author: Jack Moseley <[email protected]>
AuthorDate: Thu Jan 14 12:11:22 2021 -0800

    [GOBBLIN-1342] Add API to resume a flow
    
    Add API to resume a flow
    
    Address comments
    
    Closes #3179 from jack-moseley/resume-flow
---
 .../apache/gobblin/metrics/event/TimingEvent.java  |   2 +
 ...he.gobblin.service.flowexecutions.restspec.json |   5 +-
 .../org/apache/gobblin/service/ExecutionStatus.pdl |   5 +
 ...he.gobblin.service.flowexecutions.snapshot.json |   8 +-
 ...ache.gobblin.service.flowstatuses.snapshot.json |   3 +-
 .../gobblin/service/FlowExecutionClient.java       |  27 ++++
 .../gobblin/service/FlowExecutionResource.java     |  23 ++++
 .../service/FlowExecutionResourceHandler.java      |   5 +
 .../service/FlowExecutionResourceLocalHandler.java |  20 +--
 .../service/monitoring/ResumeFlowEvent.java        |  30 ++++
 .../service/modules/orchestration/DagManager.java  | 151 +++++++++++++++++++--
 .../modules/orchestration/DagManagerUtils.java     |  11 +-
 .../modules/orchestration/Orchestrator.java        |   2 +-
 ...GobblinServiceFlowExecutionResourceHandler.java |  13 ++
 .../service/modules/spec/JobExecutionPlan.java     |   3 +-
 .../spec/JobExecutionPlanListDeserializer.java     |   5 +
 .../spec/JobExecutionPlanListSerializer.java       |   2 +
 .../modules/spec/SerializationConstants.java       |   1 +
 .../monitoring/KafkaAvroJobStatusMonitor.java      |   4 +
 .../service/monitoring/KafkaJobStatusMonitor.java  |   7 +-
 .../modules/orchestration/DagManagerFlowTest.java  |  12 +-
 .../modules/orchestration/DagManagerTest.java      |  86 +++++++++++-
 22 files changed, 382 insertions(+), 43 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 45af9ad..3800c08 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -40,6 +40,7 @@ public class TimingEvent extends GobblinEventBuilder 
implements Closeable {
     public static final String WORK_UNITS_CREATION = "WorkUnitsCreationTimer";
     public static final String WORK_UNITS_PREPARATION = 
"WorkUnitsPreparationTimer";
     public static final String JOB_PENDING = "JobPending";
+    public static final String JOB_PENDING_RESUME = "JobPendingResume";
     public static final String JOB_ORCHESTRATED = "JobOrchestrated";
     public static final String JOB_PREPARE = "JobPrepareTimer";
     public static final String JOB_START = "JobStartTimer";
@@ -71,6 +72,7 @@ public class TimingEvent extends GobblinEventBuilder 
implements Closeable {
     public static final String FLOW_FAILED = "FlowFailed";
     public static final String FLOW_RUNNING = "FlowRunning";
     public static final String FLOW_CANCELLED = "FlowCancelled";
+    public static final String FLOW_PENDING_RESUME = "FlowPendingResume";
   }
 
   public static class FlowEventConstants {
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
index 2111064..4af7b37 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
@@ -10,11 +10,14 @@
       "type" : "org.apache.gobblin.service.FlowStatusId",
       "params" : "com.linkedin.restli.common.EmptyRecord"
     },
-    "supports" : [ "delete", "get" ],
+    "supports" : [ "delete", "get", "partial_update" ],
     "methods" : [ {
       "method" : "get",
       "doc" : "Retrieve the FlowExecution with the given key"
     }, {
+      "method" : "partial_update",
+      "doc" : "Resume a failed {@link FlowExecution} from the point before 
failure. This is specified by a partial update patch which\n sets 
executionStatus to RUNNING."
+    }, {
       "method" : "delete",
       "doc" : "Kill the FlowExecution with the given key"
     } ],
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl
index c0e63a9..ed9a59f 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl
@@ -21,6 +21,11 @@ enum ExecutionStatus {
   PENDING_RETRY
 
   /**
+   * Flow or job is currently resuming.
+   */
+  PENDING_RESUME
+
+  /**
    * Job(s) orchestrated to spec executors.
    */
   ORCHESTRATED
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
index 65321d6..97d67f8 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
@@ -13,7 +13,7 @@
     "name" : "ExecutionStatus",
     "namespace" : "org.apache.gobblin.service",
     "doc" : "Execution status for a flow or job",
-    "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "ORCHESTRATED", 
"RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
+    "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", 
"ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
     "symbolDocs" : {
       "CANCELLED" : "Flow cancelled.",
       "COMPILED" : "Flow compiled to jobs.",
@@ -21,6 +21,7 @@
       "FAILED" : "Flow or job failed",
       "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
       "PENDING" : "Flow or job is in pending state.",
+      "PENDING_RESUME" : "Flow or job is currently resuming.",
       "PENDING_RETRY" : "Flow or job is pending retry.",
       "RUNNING" : "Flow or job is currently executing"
     }
@@ -215,11 +216,14 @@
         "type" : "org.apache.gobblin.service.FlowStatusId",
         "params" : "com.linkedin.restli.common.EmptyRecord"
       },
-      "supports" : [ "delete", "get" ],
+      "supports" : [ "delete", "get", "partial_update" ],
       "methods" : [ {
         "method" : "get",
         "doc" : "Retrieve the FlowExecution with the given key"
       }, {
+        "method" : "partial_update",
+        "doc" : "Resume a failed {@link FlowExecution} from the point before 
failure. This is specified by a partial update patch which\n sets 
executionStatus to RUNNING."
+      }, {
         "method" : "delete",
         "doc" : "Kill the FlowExecution with the given key"
       } ],
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index 034fca0..d3d4dbc 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -13,7 +13,7 @@
     "name" : "ExecutionStatus",
     "namespace" : "org.apache.gobblin.service",
     "doc" : "Execution status for a flow or job",
-    "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "ORCHESTRATED", 
"RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
+    "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME", 
"ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
     "symbolDocs" : {
       "CANCELLED" : "Flow cancelled.",
       "COMPILED" : "Flow compiled to jobs.",
@@ -21,6 +21,7 @@
       "FAILED" : "Flow or job failed",
       "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
       "PENDING" : "Flow or job is in pending state.",
+      "PENDING_RESUME" : "Flow or job is currently resuming.",
       "PENDING_RETRY" : "Flow or job is pending retry.",
       "RUNNING" : "Flow or job is currently executing"
     }
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
index ffae759..8398c54 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
@@ -17,12 +17,14 @@
 
 package org.apache.gobblin.service;
 
+import com.linkedin.data.DataMap;
 import com.linkedin.restli.client.DeleteRequest;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,11 +38,15 @@ import 
com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
 import com.linkedin.r2.transport.http.client.HttpClientFactory;
 import com.linkedin.restli.client.FindRequest;
 import com.linkedin.restli.client.GetRequest;
+import com.linkedin.restli.client.PartialUpdateRequest;
 import com.linkedin.restli.client.Response;
 import com.linkedin.restli.client.RestClient;
+import com.linkedin.restli.client.UpdateRequest;
 import com.linkedin.restli.common.CollectionResponse;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.PatchRequest;
+import com.linkedin.restli.internal.server.util.DataMapUtils;
 
 
 /**
@@ -162,6 +168,27 @@ public class FlowExecutionClient implements Closeable {
   }
 
   /**
+   * Resume the flow with given FlowStatusId from it's state before failure
+   * @param flowStatusId identifier of flow execution to resume
+   * @throws RemoteInvocationException
+   */
+  public void resumeFlowExecution(FlowStatusId flowStatusId)
+      throws RemoteInvocationException {
+    LOG.debug("resumeFlowExecution with groupName " + 
flowStatusId.getFlowGroup() + " flowName " +
+        flowStatusId.getFlowName() + " flowExecutionId " + 
flowStatusId.getFlowExecutionId());
+
+    String patchJson = "{\"$set\":{\"executionStatus\":\"RUNNING\"}}";
+    DataMap dataMap = DataMapUtils.readMap(IOUtils.toInputStream(patchJson));
+    PatchRequest<FlowExecution> flowExecutionPatch = 
PatchRequest.createFromPatchDocument(dataMap);
+
+    PartialUpdateRequest<FlowExecution> partialUpdateRequest =
+        _flowexecutionsRequestBuilders.partialUpdate().id(new 
ComplexResourceKey<>(flowStatusId, new EmptyRecord()))
+            .input(flowExecutionPatch).build();
+
+    FlowClientUtils.sendRequestWithRetry(_restClient.get(), 
partialUpdateRequest, FlowexecutionsRequestBuilders.getPrimaryResource());
+  }
+
+  /**
    * Kill the flow with given FlowStatusId
    * @param flowStatusId identifier of flow execution to kill
    * @throws RemoteInvocationException
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index 54922f4..475482a 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -20,8 +20,10 @@ package org.apache.gobblin.service;
 import java.util.List;
 
 import com.google.inject.Inject;
+import com.linkedin.data.DataMap;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.server.PagingContext;
 import com.linkedin.restli.server.UpdateResponse;
 import com.linkedin.restli.server.annotations.Context;
@@ -61,6 +63,27 @@ public class FlowExecutionResource extends 
ComplexKeyResourceTemplate<FlowStatus
   }
 
   /**
+   * Resume a failed {@link FlowExecution} from the point before failure. This 
is specified by a partial update patch which
+   * sets executionStatus to RUNNING.
+   * @param key {@link FlowStatusId} of flow to resume
+   * @param flowExecutionPatch {@link PatchRequest} which is expected to set 
executionStatus to RUNNING
+   * @return {@link UpdateResponse}
+   */
+  @Override
+  public UpdateResponse update(ComplexResourceKey<FlowStatusId, EmptyRecord> 
key, PatchRequest<FlowExecution> flowExecutionPatch) {
+    DataMap dataMap = flowExecutionPatch.getPatchDocument().getDataMap("$set");
+    if (dataMap != null) {
+      String status = dataMap.getString("executionStatus");
+      if (status != null && 
status.equalsIgnoreCase(ExecutionStatus.RUNNING.name())) {
+        return this.flowExecutionResourceHandler.resume(key);
+      }
+    }
+
+    throw new UnsupportedOperationException("Only flow resume is supported for 
FlowExecution update, which is specified by "
+        + "setting executionStatus field to RUNNING");
+  }
+
+  /**
    * Kill the FlowExecution with the given key
    * @param key {@link FlowStatusId} of flow to kill
    * @return {@link UpdateResponse}
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
index 918f5ac..e79eb5f 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
@@ -46,6 +46,11 @@ public interface FlowExecutionResourceHandler {
   public List<FlowExecution> getLatestFlowExecution(PagingContext context, 
FlowId flowId, Integer count, String tag, String executionStatus);
 
   /**
+   * Resume a failed {@link FlowExecution} from the point before failure
+   */
+  public UpdateResponse resume(ComplexResourceKey<FlowStatusId, EmptyRecord> 
key);
+
+  /**
    * Kill a running {@link FlowExecution}
    */
   public UpdateResponse delete(ComplexResourceKey<FlowStatusId, EmptyRecord> 
key);
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
index 318289d..d83f70d 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
@@ -15,39 +15,26 @@
  * limitations under the License.
  */
 package org.apache.gobblin.service;
+
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang3.StringEscapeUtils;
-
 import com.google.common.base.Strings;
 import com.linkedin.data.template.SetMode;
-import com.linkedin.data.template.StringMap;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
-import com.linkedin.restli.common.PatchRequest;
-import com.linkedin.restli.server.CreateKVResponse;
 import com.linkedin.restli.server.PagingContext;
 import com.linkedin.restli.server.RestLiServiceException;
 import com.linkedin.restli.server.UpdateResponse;
-import com.linkedin.restli.server.annotations.Context;
-import com.linkedin.restli.server.annotations.Optional;
-import com.linkedin.restli.server.annotations.QueryParam;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.monitoring.FlowStatus;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
-import org.apache.gobblin.service.monitoring.KillFlowEvent;
 
 
 @Slf4j
@@ -82,6 +69,11 @@ public class FlowExecutionResourceLocalHandler implements 
FlowExecutionResourceH
   }
 
   @Override
+  public UpdateResponse resume(ComplexResourceKey<FlowStatusId, EmptyRecord> 
key) {
+    throw new UnsupportedOperationException("Resume should be handled in 
GobblinServiceFlowConfigResourceHandler");
+  }
+
+  @Override
   public UpdateResponse delete(ComplexResourceKey<FlowStatusId, EmptyRecord> 
key) {
     throw new UnsupportedOperationException("Delete should be handled in 
GobblinServiceFlowConfigResourceHandler");
   }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/ResumeFlowEvent.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/ResumeFlowEvent.java
new file mode 100644
index 0000000..e5aa19a
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/ResumeFlowEvent.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+
+@AllArgsConstructor
+@Data
+public class ResumeFlowEvent {
+  private String flowGroup;
+  private String flowName;
+  private Long flowExecutionId;
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 914ebb5..a925450 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -79,6 +79,7 @@ import 
org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
 import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
 import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -125,6 +126,7 @@ public class DagManager extends AbstractIdleService {
   private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = 
JOB_STATUS_RETRIEVER_KEY + ".class";
   private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS = 
FsJobStatusRetriever.class.getName();
   private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX + 
"dagStateStoreClass";
+  private static final String FAILED_DAG_STATESTORE_PREFIX = 
"failedDagStateStore";
   private static final String USER_JOB_QUOTA_KEY = DAG_MANAGER_PREFIX + 
"defaultJobQuota";
   private static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
   private static final String PER_USER_QUOTA = DAG_MANAGER_PREFIX + 
"perUserQuota";
@@ -158,11 +160,13 @@ public class DagManager extends AbstractIdleService {
 
   private BlockingQueue<Dag<JobExecutionPlan>>[] queue;
   private BlockingQueue<String>[] cancelQueue;
+  private BlockingQueue<String>[] resumeQueue;
   DagManagerThread[] dagManagerThreads;
 
   private ScheduledExecutorService scheduledExecutorPool;
   private boolean instrumentationEnabled;
   private DagStateStore dagStateStore;
+  private DagStateStore failedDagStateStore;
   private Map<URI, TopologySpec> topologySpecMap;
 
   @Getter
@@ -182,6 +186,7 @@ public class DagManager extends AbstractIdleService {
     this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
     this.queue = initializeDagQueue(this.numThreads);
     this.cancelQueue = initializeDagQueue(this.numThreads);
+    this.resumeQueue = initializeDagQueue(this.numThreads);
     this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
     this.pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
     this.instrumentationEnabled = instrumentationEnabled;
@@ -251,8 +256,9 @@ public class DagManager extends AbstractIdleService {
    * by one of the {@link DagManagerThread}s.
    * @param dag {@link Dag} to be added
    * @param persist whether to persist the dag to the {@link DagStateStore}
+   * @param setStatus if true, set all jobs in the dag to pending
    */
-  synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist) throws 
IOException {
+  synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, boolean 
setStatus) throws IOException {
     if (persist) {
       //Persist the dag
       this.dagStateStore.writeCheckpoint(dag);
@@ -265,7 +271,9 @@ public class DagManager extends AbstractIdleService {
     if (!this.queue[queueId].offer(dag)) {
       throw new IOException("Could not add dag" + 
DagManagerUtils.generateDagId(dag) + "to queue");
     }
-    submitEventsAndSetStatus(dag);
+    if (setStatus) {
+      submitEventsAndSetStatus(dag);
+    }
   }
 
   private void submitEventsAndSetStatus(Dag<JobExecutionPlan> dag) {
@@ -316,6 +324,16 @@ public class DagManager extends AbstractIdleService {
     }
   }
 
+  @Subscribe
+  public void handleResumeFlowEvent(ResumeFlowEvent resumeFlowEvent) {
+    log.info("Received resume request for flow ({}, {}, {})", 
resumeFlowEvent.getFlowGroup(), resumeFlowEvent.getFlowName(), 
resumeFlowEvent.getFlowExecutionId());
+    String dagId = 
DagManagerUtils.generateDagId(resumeFlowEvent.getFlowGroup(), 
resumeFlowEvent.getFlowName(), resumeFlowEvent.getFlowExecutionId());
+    int queueId = 
DagManagerUtils.getDagQueueId(resumeFlowEvent.getFlowExecutionId(), 
this.numThreads);
+    if (!this.resumeQueue[queueId].offer(dagId)) {
+      log.warn("Could not add dag " + dagId + " to resume queue");
+    }
+  }
+
   public synchronized void setTopologySpecMap(Map<URI, TopologySpec> 
topologySpecMap) {
     this.topologySpecMap = topologySpecMap;
   }
@@ -338,19 +356,20 @@ public class DagManager extends AbstractIdleService {
         log.info("Scheduling {} DagManager threads", numThreads);
         //Initializing state store for persisting Dags.
         this.dagStateStore = createDagStateStore(config, topologySpecMap);
+        this.failedDagStateStore = 
createDagStateStore(ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
 
         //On startup, the service creates DagManagerThreads that are scheduled 
at a fixed rate.
         this.dagManagerThreads = new DagManagerThread[numThreads];
         for (int i = 0; i < numThreads; i++) {
-          DagManagerThread dagManagerThread = new 
DagManagerThread(jobStatusRetriever, dagStateStore,
-              queue[i], cancelQueue[i], instrumentationEnabled, defaultQuota, 
perUserQuota);
+          DagManagerThread dagManagerThread = new 
DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
+              queue[i], cancelQueue[i], resumeQueue[i], 
instrumentationEnabled, defaultQuota, perUserQuota);
           this.dagManagerThreads[i] = dagManagerThread;
           this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, 
this.pollingInterval, TimeUnit.SECONDS);
         }
         List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
         log.info("Loading " + dags.size() + " dags from dag state store");
         for (Dag<JobExecutionPlan> dag : dags) {
-          addDag(dag, false);
+          addDag(dag, false, false);
         }
       } else { //Mark the DagManager inactive.
         log.info("Inactivating the DagManager. Shutting down all DagManager 
threads");
@@ -380,6 +399,8 @@ public class DagManager extends AbstractIdleService {
     private static final Map<String, Integer> proxyUserToJobCount = new 
ConcurrentHashMap<>();
     private static final Map<String, Integer> requesterToJobCount = new 
ConcurrentHashMap<>();
     private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
+    private final Map<String, Dag<JobExecutionPlan>> failedDags = new 
HashMap<>();
+    private final Map<String, Dag<JobExecutionPlan>> resumingDags = new 
HashMap<>();
     // dagToJobs holds a map of dagId to running jobs of that dag
     final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new 
HashMap<>();
     final Map<String, Long> dagToSLA = new HashMap<>();
@@ -394,19 +415,30 @@ public class DagManager extends AbstractIdleService {
 
     private JobStatusRetriever jobStatusRetriever;
     private DagStateStore dagStateStore;
+    private DagStateStore failedDagStateStore;
     private BlockingQueue<Dag<JobExecutionPlan>> queue;
     private BlockingQueue<String> cancelQueue;
+    private BlockingQueue<String> resumeQueue;
 
     /**
      * Constructor.
      */
-    DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore 
dagStateStore,
-        BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> 
cancelQueue, boolean instrumentationEnabled,
-        int defaultQuota, Map<String, Integer> perUserQuota) {
+    DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore 
dagStateStore, DagStateStore failedDagStateStore,
+        BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> 
cancelQueue, BlockingQueue<String> resumeQueue,
+        boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> 
perUserQuota) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
+      this.failedDagStateStore = failedDagStateStore;
+      try {
+        for (Dag<JobExecutionPlan> dag : failedDagStateStore.getDags()) {
+          this.failedDags.put(DagManagerUtils.generateDagId(dag), dag);
+        }
+      } catch (IOException e) {
+        log.error("Failed to load previously failed dags into memory", e);
+      }
       this.queue = queue;
       this.cancelQueue = cancelQueue;
+      this.resumeQueue = resumeQueue;
       this.defaultQuota = defaultQuota;
       this.perUserQuota = perUserQuota;
       if (instrumentationEnabled) {
@@ -449,6 +481,13 @@ public class DagManager extends AbstractIdleService {
           }
         }
 
+        while (!resumeQueue.isEmpty()) {
+          String dagId = resumeQueue.poll();
+          beginResumingDag(dagId);
+        }
+
+        finishResumingDags();
+
         log.debug("Polling job statuses..");
         //Poll and update the job statuses of running jobs.
         pollAndAdvanceDag();
@@ -463,6 +502,68 @@ public class DagManager extends AbstractIdleService {
     }
 
     /**
+     * Begin resuming a dag by setting the status of both the dag and the 
failed/cancelled dag nodes to {@link ExecutionStatus#PENDING_RESUME},
+     * and also sending events so that this status will be reflected in the 
job status state store.
+     */
+    private void beginResumingDag(String dagId) {
+      Dag<JobExecutionPlan> dag = this.failedDags.get(dagId);
+      if (dag == null) {
+        log.warn("No dag found with dagId " + dagId + ", so cannot resume 
flow");
+        return;
+      }
+
+      long flowResumeTime = System.currentTimeMillis();
+
+      // Set the flow and it's failed or cancelled nodes to PENDING_RESUME so 
that the flow will be resumed from the point before it failed
+      DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, 
TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
+      for (DagNode<JobExecutionPlan> node : dag.getNodes()) {
+        ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
+        if (executionStatus.equals(FAILED) || 
executionStatus.equals(CANCELLED)) {
+          node.getValue().setExecutionStatus(PENDING_RESUME);
+        }
+        Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
+        
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
+
+        // Set flowStartTime so that flow SLA will be based on current time 
instead of original flow
+        node.getValue().setFlowStartTime(flowResumeTime);
+      }
+
+      this.resumingDags.put(dagId, dag);
+    }
+
+    /**
+     * Finish resuming dags by first verifying the status is correct (flow 
should be {@link ExecutionStatus#PENDING_RESUME}
+     * and jobs should not be {@link ExecutionStatus#FAILED} or {@link 
ExecutionStatus#CANCELLED}) and then calling
+     * {@link #initialize}. This is separated from {@link #beginResumingDag} 
because it could take some time for the
+     * job status state store to reflect the updated status.
+     */
+    private void finishResumingDags() throws IOException {
+      for (Map.Entry<String, Dag<JobExecutionPlan>> dag : 
this.resumingDags.entrySet()) {
+        JobStatus flowStatus = pollFlowStatus(dag.getValue());
+        if (flowStatus == null || 
!flowStatus.getEventName().equals(PENDING_RESUME.name())) {
+          continue;
+        }
+
+        boolean dagReady = true;
+        for (DagNode<JobExecutionPlan> node : dag.getValue().getNodes()) {
+          JobStatus jobStatus = pollJobStatus(node);
+          if (jobStatus == null || 
jobStatus.getEventName().equals(FAILED.name()) || 
jobStatus.getEventName().equals(CANCELLED.name())) {
+            dagReady = false;
+            break;
+          }
+        }
+
+        if (dagReady) {
+          this.dagStateStore.writeCheckpoint(dag.getValue());
+          this.failedDagStateStore.cleanUp(dag.getValue());
+          this.failedDags.remove(dag.getKey());
+          this.resumingDags.remove(dag.getKey());
+          initialize(dag.getValue());
+        }
+      }
+    }
+
+    /**
      * Cancels the dag and sends a cancellation tracking event.
      * @param dagToCancel dag node to cancel
      * @throws ExecutionException executionException
@@ -713,6 +814,25 @@ public class DagManager extends AbstractIdleService {
       String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
       String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
 
+      return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, 
jobName);
+    }
+
+    /**
+     * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link 
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+     */
+    private JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag) {
+      if (dag == null || dag.isEmpty()) {
+        return null;
+      }
+      Config jobConfig = 
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+      String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+      String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+      long flowExecutionId = 
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+      return pollStatus(flowGroup, flowName, flowExecutionId, 
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY);
+    }
+
+    private JobStatus pollStatus(String flowGroup, String flowName, long 
flowExecutionId, String jobGroup, String jobName) {
       long pollStartTime = System.nanoTime();
       Iterator<JobStatus> jobStatusIterator =
           this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId, jobName, jobGroup);
@@ -992,6 +1112,7 @@ public class DagManager extends AbstractIdleService {
       List<String> dagIdstoClean = new ArrayList<>();
       //Clean up failed dags
       for (String dagId : this.failedDagIdsFinishRunning) {
+        addFailedDag(dagId);
         //Skip monitoring of any other jobs of the failed dag.
         LinkedList<DagNode<JobExecutionPlan>> dagNodeList = 
this.dagToJobs.get(dagId);
         while (!dagNodeList.isEmpty()) {
@@ -1009,6 +1130,7 @@ public class DagManager extends AbstractIdleService {
         if (!hasRunningJobs(dagId) && 
!this.failedDagIdsFinishRunning.contains(dagId)) {
           String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
           if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
+            addFailedDag(dagId);
             status = TimingEvent.FlowTimings.FLOW_FAILED;
             this.failedDagIdsFinishAllPossible.remove(dagId);
           }
@@ -1025,6 +1147,19 @@ public class DagManager extends AbstractIdleService {
     }
 
     /**
+     * Add a dag to failed dag state store
+     */
+    private synchronized void addFailedDag(String dagId) {
+      try {
+        log.info("Adding dag " + dagId + " to failed dag state store");
+        this.failedDagStateStore.writeCheckpoint(this.dags.get(dagId));
+      } catch (IOException e) {
+        log.error("Failed to add dag " + dagId + " to failed dag state store", 
e);
+      }
+      this.failedDags.put(dagId, this.dags.get(dagId));
+    }
+
+    /**
      * Note that removal of a {@link Dag} entry in {@link #dags} needs to be 
happen after {@link #cleanUp()}
      * since the real {@link Dag} object is required for {@link #cleanUp()},
      * and cleaning of all relevant states need to be atomic
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 248fe4d..b55b0e1 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -185,7 +185,8 @@ public class DagManagerUtils {
       DagNode<JobExecutionPlan> node = nodesToExpand.poll();
       ExecutionStatus executionStatus = getExecutionStatus(node);
       boolean addFlag = true;
-      if (executionStatus == ExecutionStatus.PENDING || executionStatus == 
ExecutionStatus.PENDING_RETRY) {
+      if (executionStatus == ExecutionStatus.PENDING || executionStatus == 
ExecutionStatus.PENDING_RETRY
+          || executionStatus == ExecutionStatus.PENDING_RESUME) {
         //Add a node to be executed next, only if all of its parent nodes are 
COMPLETE.
         List<DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node);
         for (DagNode<JobExecutionPlan> parentNode : parentNodes) {
@@ -243,12 +244,14 @@ public class DagManagerUtils {
   }
 
   /**
-   * flow start time is assumed to be same the flow execution id which is 
timestamp flow request was received
+   * Flow start time is the same as the flow execution id which is the 
timestamp flow request was received, unless it
+   * is a resumed flow, in which case it is {@link 
JobExecutionPlan#getFlowStartTime()}
    * @param dagNode dag node in context
-   * @return flow execution id
+   * @return flow start time
    */
   static long getFlowStartTime(DagNode<JobExecutionPlan> dagNode) {
-    return getFlowExecId(dagNode);
+    long flowStartTime = dagNode.getValue().getFlowStartTime();
+    return flowStartTime == 0L ? getFlowExecId(dagNode) : flowStartTime;
   }
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index e6afe93..f790361 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -302,7 +302,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
       if (this.dagManager.isPresent()) {
         //Send the dag to the DagManager.
-        this.dagManager.get().addDag(jobExecutionPlanDag, true);
+        this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
       } else {
         // Schedule all compiled JobSpecs on their respective Executor
         for (Dag.DagNode<JobExecutionPlan> dagNode : 
jobExecutionPlanDag.getNodes()) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
index 2a94fb0..0e3b8fe 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
@@ -37,6 +37,7 @@ import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.FlowStatusId;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
 
 
 /**
@@ -70,6 +71,18 @@ public class GobblinServiceFlowExecutionResourceHandler 
implements FlowExecution
   }
 
   @Override
+  public UpdateResponse resume(ComplexResourceKey<FlowStatusId, EmptyRecord> 
key) {
+    String flowGroup = key.getKey().getFlowGroup();
+    String flowName = key.getKey().getFlowName();
+    Long flowExecutionId = key.getKey().getFlowExecutionId();
+    if (this.forceLeader) {
+      HelixUtils.throwErrorIfNotLeader(this.helixManager);
+    }
+    this.eventBus.post(new ResumeFlowEvent(flowGroup, flowName, 
flowExecutionId));
+    return new UpdateResponse(HttpStatus.S_200_OK);
+  }
+
+  @Override
   public UpdateResponse delete(ComplexResourceKey<FlowStatusId, EmptyRecord> 
key) {
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index f88b561..0cc4be3 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -57,7 +57,7 @@ import static 
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLAT
  * where the {@link JobSpec} will be executed.
  */
 @Data
-@EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts", 
"jobFuture"})
+@EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts", 
"jobFuture", "flowStartTime"})
 public class JobExecutionPlan {
   public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
 
@@ -67,6 +67,7 @@ public class JobExecutionPlan {
   private final int maxAttempts;
   private int currentAttempts = 0;
   private Optional<Future> jobFuture = Optional.absent();
+  private long flowStartTime = 0L;
 
   public static class Factory {
     public static final String JOB_NAME_COMPONENT_SEPARATION_CHAR = "_";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
index 1f988c2..ede2870 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
@@ -111,6 +111,11 @@ public class JobExecutionPlanListDeserializer implements 
JsonDeserializer<List<J
       JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(jobSpec, 
specExecutor);
       jobExecutionPlan.setExecutionStatus(executionStatus);
 
+      JsonElement flowStartTime = 
serializedJobExecutionPlan.get(SerializationConstants.FLOW_START_TIME_KEY);
+      if (flowStartTime != null) {
+        jobExecutionPlan.setFlowStartTime(flowStartTime.getAsLong());
+      }
+
       try {
         String jobExecutionFuture = 
serializedJobExecutionPlan.get(SerializationConstants.JOB_EXECUTION_FUTURE).getAsString();
         Future future = 
specExecutor.getProducer().get().deserializeAddSpecResponse(jobExecutionFuture);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
index 5a38c19..3d2fe36 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
@@ -83,6 +83,8 @@ public class JobExecutionPlanListSerializer implements 
JsonSerializer<List<JobEx
       String executionStatus = jobExecutionPlan.getExecutionStatus().name();
       
jobExecutionPlanJson.addProperty(SerializationConstants.EXECUTION_STATUS_KEY, 
executionStatus);
 
+      
jobExecutionPlanJson.addProperty(SerializationConstants.FLOW_START_TIME_KEY, 
jobExecutionPlan.getFlowStartTime());
+
       try {
         String jobExecutionFuture = 
jobExecutionPlan.getSpecExecutor().getProducer().get()
             
.serializeAddSpecResponse(jobExecutionPlan.getJobFuture().orNull());
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
index 5359465..7cccf9f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
@@ -32,4 +32,5 @@ public class SerializationConstants {
 
   public static final String EXECUTION_STATUS_KEY = "executionStatus";
   public static final String JOB_EXECUTION_FUTURE = "jobExecutionFuture";
+  public static final String FLOW_START_TIME_KEY = "flowStartTime";
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index 3ea701c..def4943 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -126,6 +126,10 @@ public class KafkaAvroJobStatusMonitor extends 
KafkaJobStatusMonitor {
       case TimingEvent.LauncherTimings.JOB_PENDING:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.PENDING.name());
         break;
+      case TimingEvent.FlowTimings.FLOW_PENDING_RESUME:
+      case TimingEvent.LauncherTimings.JOB_PENDING_RESUME:
+        properties.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.PENDING_RESUME.name());
+        break;
       case TimingEvent.LauncherTimings.JOB_ORCHESTRATED:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.ORCHESTRATED.name());
         properties.put(TimingEvent.JOB_ORCHESTRATED_TIME, 
properties.getProperty(TimingEvent.METADATA_END_TIME));
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 68f48c9..abd15aa 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -84,7 +84,7 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
       KAFKA_AUTO_OFFSET_RESET_KEY, KAFKA_AUTO_OFFSET_RESET_SMALLEST));
 
   private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES = 
ImmutableList
-      .of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, 
ExecutionStatus.PENDING_RETRY,
+      .of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING, 
ExecutionStatus.PENDING_RESUME, ExecutionStatus.PENDING_RETRY,
           ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING, 
ExecutionStatus.COMPLETE,
           ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);
 
@@ -173,8 +173,9 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
       String previousStatus = states.get(states.size() - 
1).getProp(JobStatusRetriever.EVENT_NAME_FIELD);
       String currentStatus = 
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
 
-      if (previousStatus != null && currentStatus != null && 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
-          < 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))) {
+      // PENDING_RESUME is allowed to override, because it happens when a flow 
is being resumed from previously being failed
+      if (previousStatus != null && currentStatus != null && 
!currentStatus.equals(ExecutionStatus.PENDING_RESUME.name())
+        && 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) < 
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))) {
         log.warn(String.format("Received status %s when status is already %s 
for flow (%s, %s, %s), job (%s, %s)",
             currentStatus, previousStatus, flowGroup, flowName, 
flowExecutionId, jobGroup, jobName));
         jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index ecd90fa..f9b8bd9 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -94,9 +94,9 @@ public class DagManagerFlowTest {
         .thenReturn(Collections.singletonList(flowExecutionId3));
 
     // mock add spec
-    dagManager.addDag(dag1, true);
-    dagManager.addDag(dag2, true);
-    dagManager.addDag(dag3, true);
+    dagManager.addDag(dag1, true, true);
+    dagManager.addDag(dag2, true, true);
+    dagManager.addDag(dag3, true, true);
 
     // check existence of dag in dagToJobs map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -152,7 +152,7 @@ public class DagManagerFlowTest {
         .thenReturn(Collections.singletonList(flowExecutionId));
 
     // mock add spec
-    dagManager.addDag(dag, true);
+    dagManager.addDag(dag, true, true);
 
     // check existence of dag in dagToJobs map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -196,7 +196,7 @@ public class DagManagerFlowTest {
     dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
 
     // mock add spec
-    dagManager.addDag(dag, true);
+    dagManager.addDag(dag, true, true);
 
     // check existence of dag in dagToSLA map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -233,7 +233,7 @@ public class DagManagerFlowTest {
     dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
 
     // mock add spec
-    dagManager.addDag(dag, true);
+    dagManager.addDag(dag, true, true);
 
     // check existence of dag in dagToSLA map
     AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 60f25d1..a39457f 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -63,9 +63,11 @@ public class DagManagerTest {
   private DagManager.DagManagerThread _dagManagerThread;
   private LinkedBlockingQueue<Dag<JobExecutionPlan>> queue;
   private LinkedBlockingQueue<String> cancelQueue;
+  private LinkedBlockingQueue<String> resumeQueue;
   private Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag;
   private Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs;
   private Map<String, Dag<JobExecutionPlan>> dags;
+  private Map<String, Dag<JobExecutionPlan>> failedDags;
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -77,8 +79,9 @@ public class DagManagerTest {
     this._jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
     this.queue = new LinkedBlockingQueue<>();
     this.cancelQueue = new LinkedBlockingQueue<>();
-    this._dagManagerThread = new 
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, queue, 
cancelQueue,
-        true, 5, new HashMap<>());
+    this.resumeQueue = new LinkedBlockingQueue<>();
+    this._dagManagerThread = new 
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, 
_dagStateStore, queue, cancelQueue,
+        resumeQueue, true, 5, new HashMap<>());
 
     Field jobToDagField = 
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
     jobToDagField.setAccessible(true);
@@ -92,6 +95,9 @@ public class DagManagerTest {
     dagsField.setAccessible(true);
     this.dags = (Map<String, Dag<JobExecutionPlan>>) 
dagsField.get(this._dagManagerThread);
 
+    Field failedDagsField = 
DagManager.DagManagerThread.class.getDeclaredField("failedDags");
+    failedDagsField.setAccessible(true);
+    this.failedDags = (Map<String, Dag<JobExecutionPlan>>) 
failedDagsField.get(this._dagManagerThread);
   }
 
   /**
@@ -327,6 +333,82 @@ public class DagManagerTest {
   }
 
   @Test (dependsOnMethods = "testFailedDag")
+  public void testResumeDag() throws URISyntaxException, IOException {
+    long flowExecutionId = System.currentTimeMillis();
+    String flowGroupId = "0";
+    String flowGroup = "group" + flowGroupId;
+    String flowName = "flow" + flowGroupId;
+    String jobName0 = "job0";
+    String jobName1 = "job1";
+    String jobName2 = "job2";
+
+    Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId, 
"FINISH_RUNNING", true);
+    String dagId = DagManagerUtils.generateDagId(dag);
+
+    //Add a dag to the queue of dags
+    this.queue.offer(dag);
+    Iterator<JobStatus> jobStatusIterator1 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, 
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator2 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator3 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1, 
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator4 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator5 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator6 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.FAILED));
+    Iterator<JobStatus> jobStatusIterator7 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, "NA_KEY", 
"NA_KEY", String.valueOf(ExecutionStatus.PENDING_RESUME));
+    Iterator<JobStatus> jobStatusIterator8 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator9 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+    Iterator<JobStatus> jobStatusIterator10 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.PENDING_RESUME));
+    Iterator<JobStatus> jobStatusIterator11 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+    Iterator<JobStatus> jobStatusIterator12 =
+        getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2, 
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), 
Mockito.anyString())).
+        thenReturn(jobStatusIterator1).
+        thenReturn(jobStatusIterator2).
+        thenReturn(jobStatusIterator3).
+        thenReturn(jobStatusIterator4).
+        thenReturn(jobStatusIterator5).
+        thenReturn(jobStatusIterator6).
+        thenReturn(jobStatusIterator7).
+        thenReturn(jobStatusIterator8).
+        thenReturn(jobStatusIterator9).
+        thenReturn(jobStatusIterator10).
+        thenReturn(jobStatusIterator11).
+        thenReturn(jobStatusIterator12);
+
+    // Run thread until job2 fails
+    for (int i = 0; i < 4; i++) {
+      this._dagManagerThread.run();
+    }
+
+    Assert.assertTrue(this.failedDags.containsKey(dagId));
+
+    // Resume dag
+    this.resumeQueue.offer(dagId);
+
+    // Job2 rerunning
+    this._dagManagerThread.run();
+    Assert.assertFalse(this.failedDags.containsKey(dagId));
+    Assert.assertTrue(this.dags.containsKey(dagId));
+
+    // Job2 complete
+    this._dagManagerThread.run();
+    Assert.assertFalse(this.failedDags.containsKey(dagId));
+    Assert.assertFalse(this.dags.containsKey(dagId));
+  }
+
+  @Test (dependsOnMethods = "testResumeDag")
   public void testSucceedAfterRetry() throws Exception {
     long flowExecutionId = System.currentTimeMillis();
     String flowGroupId = "0";

Reply via email to