This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4e9326b [GOBBLIN-1342] Add API to resume a flow
4e9326b is described below
commit 4e9326bd96ab9b5ec05dd1cd5e8010a7d6175f0c
Author: Jack Moseley <[email protected]>
AuthorDate: Thu Jan 14 12:11:22 2021 -0800
[GOBBLIN-1342] Add API to resume a flow
Add API to resume a flow
Address comments
Closes #3179 from jack-moseley/resume-flow
---
.../apache/gobblin/metrics/event/TimingEvent.java | 2 +
...he.gobblin.service.flowexecutions.restspec.json | 5 +-
.../org/apache/gobblin/service/ExecutionStatus.pdl | 5 +
...he.gobblin.service.flowexecutions.snapshot.json | 8 +-
...ache.gobblin.service.flowstatuses.snapshot.json | 3 +-
.../gobblin/service/FlowExecutionClient.java | 27 ++++
.../gobblin/service/FlowExecutionResource.java | 23 ++++
.../service/FlowExecutionResourceHandler.java | 5 +
.../service/FlowExecutionResourceLocalHandler.java | 20 +--
.../service/monitoring/ResumeFlowEvent.java | 30 ++++
.../service/modules/orchestration/DagManager.java | 151 +++++++++++++++++++--
.../modules/orchestration/DagManagerUtils.java | 11 +-
.../modules/orchestration/Orchestrator.java | 2 +-
...GobblinServiceFlowExecutionResourceHandler.java | 13 ++
.../service/modules/spec/JobExecutionPlan.java | 3 +-
.../spec/JobExecutionPlanListDeserializer.java | 5 +
.../spec/JobExecutionPlanListSerializer.java | 2 +
.../modules/spec/SerializationConstants.java | 1 +
.../monitoring/KafkaAvroJobStatusMonitor.java | 4 +
.../service/monitoring/KafkaJobStatusMonitor.java | 7 +-
.../modules/orchestration/DagManagerFlowTest.java | 12 +-
.../modules/orchestration/DagManagerTest.java | 86 +++++++++++-
22 files changed, 382 insertions(+), 43 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 45af9ad..3800c08 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -40,6 +40,7 @@ public class TimingEvent extends GobblinEventBuilder
implements Closeable {
public static final String WORK_UNITS_CREATION = "WorkUnitsCreationTimer";
public static final String WORK_UNITS_PREPARATION =
"WorkUnitsPreparationTimer";
public static final String JOB_PENDING = "JobPending";
+ public static final String JOB_PENDING_RESUME = "JobPendingResume";
public static final String JOB_ORCHESTRATED = "JobOrchestrated";
public static final String JOB_PREPARE = "JobPrepareTimer";
public static final String JOB_START = "JobStartTimer";
@@ -71,6 +72,7 @@ public class TimingEvent extends GobblinEventBuilder
implements Closeable {
public static final String FLOW_FAILED = "FlowFailed";
public static final String FLOW_RUNNING = "FlowRunning";
public static final String FLOW_CANCELLED = "FlowCancelled";
+ public static final String FLOW_PENDING_RESUME = "FlowPendingResume";
}
public static class FlowEventConstants {
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
index 2111064..4af7b37 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
@@ -10,11 +10,14 @@
"type" : "org.apache.gobblin.service.FlowStatusId",
"params" : "com.linkedin.restli.common.EmptyRecord"
},
- "supports" : [ "delete", "get" ],
+ "supports" : [ "delete", "get", "partial_update" ],
"methods" : [ {
"method" : "get",
"doc" : "Retrieve the FlowExecution with the given key"
}, {
+ "method" : "partial_update",
+ "doc" : "Resume a failed {@link FlowExecution} from the point before
failure. This is specified by a partial update patch which\n sets
executionStatus to RUNNING."
+ }, {
"method" : "delete",
"doc" : "Kill the FlowExecution with the given key"
} ],
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl
index c0e63a9..ed9a59f 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdl
@@ -21,6 +21,11 @@ enum ExecutionStatus {
PENDING_RETRY
/**
+ * Flow or job is currently resuming.
+ */
+ PENDING_RESUME
+
+ /**
* Job(s) orchestrated to spec executors.
*/
ORCHESTRATED
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
index 65321d6..97d67f8 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
@@ -13,7 +13,7 @@
"name" : "ExecutionStatus",
"namespace" : "org.apache.gobblin.service",
"doc" : "Execution status for a flow or job",
- "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "ORCHESTRATED",
"RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
+ "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME",
"ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
"symbolDocs" : {
"CANCELLED" : "Flow cancelled.",
"COMPILED" : "Flow compiled to jobs.",
@@ -21,6 +21,7 @@
"FAILED" : "Flow or job failed",
"ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
"PENDING" : "Flow or job is in pending state.",
+ "PENDING_RESUME" : "Flow or job is currently resuming.",
"PENDING_RETRY" : "Flow or job is pending retry.",
"RUNNING" : "Flow or job is currently executing"
}
@@ -215,11 +216,14 @@
"type" : "org.apache.gobblin.service.FlowStatusId",
"params" : "com.linkedin.restli.common.EmptyRecord"
},
- "supports" : [ "delete", "get" ],
+ "supports" : [ "delete", "get", "partial_update" ],
"methods" : [ {
"method" : "get",
"doc" : "Retrieve the FlowExecution with the given key"
}, {
+ "method" : "partial_update",
+ "doc" : "Resume a failed {@link FlowExecution} from the point before
failure. This is specified by a partial update patch which\n sets
executionStatus to RUNNING."
+ }, {
"method" : "delete",
"doc" : "Kill the FlowExecution with the given key"
} ],
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index 034fca0..d3d4dbc 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -13,7 +13,7 @@
"name" : "ExecutionStatus",
"namespace" : "org.apache.gobblin.service",
"doc" : "Execution status for a flow or job",
- "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "ORCHESTRATED",
"RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
+ "symbols" : [ "COMPILED", "PENDING", "PENDING_RETRY", "PENDING_RESUME",
"ORCHESTRATED", "RUNNING", "COMPLETE", "FAILED", "CANCELLED" ],
"symbolDocs" : {
"CANCELLED" : "Flow cancelled.",
"COMPILED" : "Flow compiled to jobs.",
@@ -21,6 +21,7 @@
"FAILED" : "Flow or job failed",
"ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
"PENDING" : "Flow or job is in pending state.",
+ "PENDING_RESUME" : "Flow or job is currently resuming.",
"PENDING_RETRY" : "Flow or job is pending retry.",
"RUNNING" : "Flow or job is currently executing"
}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
index ffae759..8398c54 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
@@ -17,12 +17,14 @@
package org.apache.gobblin.service;
+import com.linkedin.data.DataMap;
import com.linkedin.restli.client.DeleteRequest;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,11 +38,15 @@ import
com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
import com.linkedin.r2.transport.http.client.HttpClientFactory;
import com.linkedin.restli.client.FindRequest;
import com.linkedin.restli.client.GetRequest;
+import com.linkedin.restli.client.PartialUpdateRequest;
import com.linkedin.restli.client.Response;
import com.linkedin.restli.client.RestClient;
+import com.linkedin.restli.client.UpdateRequest;
import com.linkedin.restli.common.CollectionResponse;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.PatchRequest;
+import com.linkedin.restli.internal.server.util.DataMapUtils;
/**
@@ -162,6 +168,27 @@ public class FlowExecutionClient implements Closeable {
}
/**
+ * Resume the flow with given FlowStatusId from it's state before failure
+ * @param flowStatusId identifier of flow execution to resume
+ * @throws RemoteInvocationException
+ */
+ public void resumeFlowExecution(FlowStatusId flowStatusId)
+ throws RemoteInvocationException {
+ LOG.debug("resumeFlowExecution with groupName " +
flowStatusId.getFlowGroup() + " flowName " +
+ flowStatusId.getFlowName() + " flowExecutionId " +
flowStatusId.getFlowExecutionId());
+
+ String patchJson = "{\"$set\":{\"executionStatus\":\"RUNNING\"}}";
+ DataMap dataMap = DataMapUtils.readMap(IOUtils.toInputStream(patchJson));
+ PatchRequest<FlowExecution> flowExecutionPatch =
PatchRequest.createFromPatchDocument(dataMap);
+
+ PartialUpdateRequest<FlowExecution> partialUpdateRequest =
+ _flowexecutionsRequestBuilders.partialUpdate().id(new
ComplexResourceKey<>(flowStatusId, new EmptyRecord()))
+ .input(flowExecutionPatch).build();
+
+ FlowClientUtils.sendRequestWithRetry(_restClient.get(),
partialUpdateRequest, FlowexecutionsRequestBuilders.getPrimaryResource());
+ }
+
+ /**
* Kill the flow with given FlowStatusId
* @param flowStatusId identifier of flow execution to kill
* @throws RemoteInvocationException
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index 54922f4..475482a 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -20,8 +20,10 @@ package org.apache.gobblin.service;
import java.util.List;
import com.google.inject.Inject;
+import com.linkedin.data.DataMap;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.PatchRequest;
import com.linkedin.restli.server.PagingContext;
import com.linkedin.restli.server.UpdateResponse;
import com.linkedin.restli.server.annotations.Context;
@@ -61,6 +63,27 @@ public class FlowExecutionResource extends
ComplexKeyResourceTemplate<FlowStatus
}
/**
+ * Resume a failed {@link FlowExecution} from the point before failure. This
is specified by a partial update patch which
+ * sets executionStatus to RUNNING.
+ * @param key {@link FlowStatusId} of flow to resume
+ * @param flowExecutionPatch {@link PatchRequest} which is expected to set
executionStatus to RUNNING
+ * @return {@link UpdateResponse}
+ */
+ @Override
+ public UpdateResponse update(ComplexResourceKey<FlowStatusId, EmptyRecord>
key, PatchRequest<FlowExecution> flowExecutionPatch) {
+ DataMap dataMap = flowExecutionPatch.getPatchDocument().getDataMap("$set");
+ if (dataMap != null) {
+ String status = dataMap.getString("executionStatus");
+ if (status != null &&
status.equalsIgnoreCase(ExecutionStatus.RUNNING.name())) {
+ return this.flowExecutionResourceHandler.resume(key);
+ }
+ }
+
+ throw new UnsupportedOperationException("Only flow resume is supported for
FlowExecution update, which is specified by "
+ + "setting executionStatus field to RUNNING");
+ }
+
+ /**
* Kill the FlowExecution with the given key
* @param key {@link FlowStatusId} of flow to kill
* @return {@link UpdateResponse}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
index 918f5ac..e79eb5f 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
@@ -46,6 +46,11 @@ public interface FlowExecutionResourceHandler {
public List<FlowExecution> getLatestFlowExecution(PagingContext context,
FlowId flowId, Integer count, String tag, String executionStatus);
/**
+ * Resume a failed {@link FlowExecution} from the point before failure
+ */
+ public UpdateResponse resume(ComplexResourceKey<FlowStatusId, EmptyRecord>
key);
+
+ /**
* Kill a running {@link FlowExecution}
*/
public UpdateResponse delete(ComplexResourceKey<FlowStatusId, EmptyRecord>
key);
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
index 318289d..d83f70d 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
@@ -15,39 +15,26 @@
* limitations under the License.
*/
package org.apache.gobblin.service;
+
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringEscapeUtils;
-
import com.google.common.base.Strings;
import com.linkedin.data.template.SetMode;
-import com.linkedin.data.template.StringMap;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
-import com.linkedin.restli.common.PatchRequest;
-import com.linkedin.restli.server.CreateKVResponse;
import com.linkedin.restli.server.PagingContext;
import com.linkedin.restli.server.RestLiServiceException;
import com.linkedin.restli.server.UpdateResponse;
-import com.linkedin.restli.server.annotations.Context;
-import com.linkedin.restli.server.annotations.Optional;
-import com.linkedin.restli.server.annotations.QueryParam;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.monitoring.FlowStatus;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
-import org.apache.gobblin.service.monitoring.KillFlowEvent;
@Slf4j
@@ -82,6 +69,11 @@ public class FlowExecutionResourceLocalHandler implements
FlowExecutionResourceH
}
@Override
+ public UpdateResponse resume(ComplexResourceKey<FlowStatusId, EmptyRecord>
key) {
+ throw new UnsupportedOperationException("Resume should be handled in
GobblinServiceFlowConfigResourceHandler");
+ }
+
+ @Override
public UpdateResponse delete(ComplexResourceKey<FlowStatusId, EmptyRecord>
key) {
throw new UnsupportedOperationException("Delete should be handled in
GobblinServiceFlowConfigResourceHandler");
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/ResumeFlowEvent.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/ResumeFlowEvent.java
new file mode 100644
index 0000000..e5aa19a
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/ResumeFlowEvent.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+
+@AllArgsConstructor
+@Data
+public class ResumeFlowEvent {
+ private String flowGroup;
+ private String flowName;
+ private Long flowExecutionId;
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 914ebb5..a925450 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -79,6 +79,7 @@ import
org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -125,6 +126,7 @@ public class DagManager extends AbstractIdleService {
private static final String JOB_STATUS_RETRIEVER_CLASS_KEY =
JOB_STATUS_RETRIEVER_KEY + ".class";
private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS =
FsJobStatusRetriever.class.getName();
private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX +
"dagStateStoreClass";
+ private static final String FAILED_DAG_STATESTORE_PREFIX =
"failedDagStateStore";
private static final String USER_JOB_QUOTA_KEY = DAG_MANAGER_PREFIX +
"defaultJobQuota";
private static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
private static final String PER_USER_QUOTA = DAG_MANAGER_PREFIX +
"perUserQuota";
@@ -158,11 +160,13 @@ public class DagManager extends AbstractIdleService {
private BlockingQueue<Dag<JobExecutionPlan>>[] queue;
private BlockingQueue<String>[] cancelQueue;
+ private BlockingQueue<String>[] resumeQueue;
DagManagerThread[] dagManagerThreads;
private ScheduledExecutorService scheduledExecutorPool;
private boolean instrumentationEnabled;
private DagStateStore dagStateStore;
+ private DagStateStore failedDagStateStore;
private Map<URI, TopologySpec> topologySpecMap;
@Getter
@@ -182,6 +186,7 @@ public class DagManager extends AbstractIdleService {
this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY,
DEFAULT_NUM_THREADS);
this.queue = initializeDagQueue(this.numThreads);
this.cancelQueue = initializeDagQueue(this.numThreads);
+ this.resumeQueue = initializeDagQueue(this.numThreads);
this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
this.pollingInterval = ConfigUtils.getInt(config,
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
this.instrumentationEnabled = instrumentationEnabled;
@@ -251,8 +256,9 @@ public class DagManager extends AbstractIdleService {
* by one of the {@link DagManagerThread}s.
* @param dag {@link Dag} to be added
* @param persist whether to persist the dag to the {@link DagStateStore}
+ * @param setStatus if true, set all jobs in the dag to pending
*/
- synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist) throws
IOException {
+ synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, boolean
setStatus) throws IOException {
if (persist) {
//Persist the dag
this.dagStateStore.writeCheckpoint(dag);
@@ -265,7 +271,9 @@ public class DagManager extends AbstractIdleService {
if (!this.queue[queueId].offer(dag)) {
throw new IOException("Could not add dag" +
DagManagerUtils.generateDagId(dag) + "to queue");
}
- submitEventsAndSetStatus(dag);
+ if (setStatus) {
+ submitEventsAndSetStatus(dag);
+ }
}
private void submitEventsAndSetStatus(Dag<JobExecutionPlan> dag) {
@@ -316,6 +324,16 @@ public class DagManager extends AbstractIdleService {
}
}
+ @Subscribe
+ public void handleResumeFlowEvent(ResumeFlowEvent resumeFlowEvent) {
+ log.info("Received resume request for flow ({}, {}, {})",
resumeFlowEvent.getFlowGroup(), resumeFlowEvent.getFlowName(),
resumeFlowEvent.getFlowExecutionId());
+ String dagId =
DagManagerUtils.generateDagId(resumeFlowEvent.getFlowGroup(),
resumeFlowEvent.getFlowName(), resumeFlowEvent.getFlowExecutionId());
+ int queueId =
DagManagerUtils.getDagQueueId(resumeFlowEvent.getFlowExecutionId(),
this.numThreads);
+ if (!this.resumeQueue[queueId].offer(dagId)) {
+ log.warn("Could not add dag " + dagId + " to resume queue");
+ }
+ }
+
public synchronized void setTopologySpecMap(Map<URI, TopologySpec>
topologySpecMap) {
this.topologySpecMap = topologySpecMap;
}
@@ -338,19 +356,20 @@ public class DagManager extends AbstractIdleService {
log.info("Scheduling {} DagManager threads", numThreads);
//Initializing state store for persisting Dags.
this.dagStateStore = createDagStateStore(config, topologySpecMap);
+ this.failedDagStateStore =
createDagStateStore(ConfigUtils.getConfigOrEmpty(config,
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
//On startup, the service creates DagManagerThreads that are scheduled
at a fixed rate.
this.dagManagerThreads = new DagManagerThread[numThreads];
for (int i = 0; i < numThreads; i++) {
- DagManagerThread dagManagerThread = new
DagManagerThread(jobStatusRetriever, dagStateStore,
- queue[i], cancelQueue[i], instrumentationEnabled, defaultQuota,
perUserQuota);
+ DagManagerThread dagManagerThread = new
DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
+ queue[i], cancelQueue[i], resumeQueue[i],
instrumentationEnabled, defaultQuota, perUserQuota);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0,
this.pollingInterval, TimeUnit.SECONDS);
}
List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
log.info("Loading " + dags.size() + " dags from dag state store");
for (Dag<JobExecutionPlan> dag : dags) {
- addDag(dag, false);
+ addDag(dag, false, false);
}
} else { //Mark the DagManager inactive.
log.info("Inactivating the DagManager. Shutting down all DagManager
threads");
@@ -380,6 +399,8 @@ public class DagManager extends AbstractIdleService {
private static final Map<String, Integer> proxyUserToJobCount = new
ConcurrentHashMap<>();
private static final Map<String, Integer> requesterToJobCount = new
ConcurrentHashMap<>();
private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
+ private final Map<String, Dag<JobExecutionPlan>> failedDags = new
HashMap<>();
+ private final Map<String, Dag<JobExecutionPlan>> resumingDags = new
HashMap<>();
// dagToJobs holds a map of dagId to running jobs of that dag
final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new
HashMap<>();
final Map<String, Long> dagToSLA = new HashMap<>();
@@ -394,19 +415,30 @@ public class DagManager extends AbstractIdleService {
private JobStatusRetriever jobStatusRetriever;
private DagStateStore dagStateStore;
+ private DagStateStore failedDagStateStore;
private BlockingQueue<Dag<JobExecutionPlan>> queue;
private BlockingQueue<String> cancelQueue;
+ private BlockingQueue<String> resumeQueue;
/**
* Constructor.
*/
- DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore
dagStateStore,
- BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String>
cancelQueue, boolean instrumentationEnabled,
- int defaultQuota, Map<String, Integer> perUserQuota) {
+ DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore
dagStateStore, DagStateStore failedDagStateStore,
+ BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String>
cancelQueue, BlockingQueue<String> resumeQueue,
+ boolean instrumentationEnabled, int defaultQuota, Map<String, Integer>
perUserQuota) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
+ this.failedDagStateStore = failedDagStateStore;
+ try {
+ for (Dag<JobExecutionPlan> dag : failedDagStateStore.getDags()) {
+ this.failedDags.put(DagManagerUtils.generateDagId(dag), dag);
+ }
+ } catch (IOException e) {
+ log.error("Failed to load previously failed dags into memory", e);
+ }
this.queue = queue;
this.cancelQueue = cancelQueue;
+ this.resumeQueue = resumeQueue;
this.defaultQuota = defaultQuota;
this.perUserQuota = perUserQuota;
if (instrumentationEnabled) {
@@ -449,6 +481,13 @@ public class DagManager extends AbstractIdleService {
}
}
+ while (!resumeQueue.isEmpty()) {
+ String dagId = resumeQueue.poll();
+ beginResumingDag(dagId);
+ }
+
+ finishResumingDags();
+
log.debug("Polling job statuses..");
//Poll and update the job statuses of running jobs.
pollAndAdvanceDag();
@@ -463,6 +502,68 @@ public class DagManager extends AbstractIdleService {
}
/**
+ * Begin resuming a dag by setting the status of both the dag and the
failed/cancelled dag nodes to {@link ExecutionStatus#PENDING_RESUME},
+ * and also sending events so that this status will be reflected in the
job status state store.
+ */
+ private void beginResumingDag(String dagId) {
+ Dag<JobExecutionPlan> dag = this.failedDags.get(dagId);
+ if (dag == null) {
+ log.warn("No dag found with dagId " + dagId + ", so cannot resume
flow");
+ return;
+ }
+
+ long flowResumeTime = System.currentTimeMillis();
+
+ // Set the flow and it's failed or cancelled nodes to PENDING_RESUME so
that the flow will be resumed from the point before it failed
+ DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag,
TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
+ for (DagNode<JobExecutionPlan> node : dag.getNodes()) {
+ ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
+ if (executionStatus.equals(FAILED) ||
executionStatus.equals(CANCELLED)) {
+ node.getValue().setExecutionStatus(PENDING_RESUME);
+ }
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
+
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
+
+ // Set flowStartTime so that flow SLA will be based on current time
instead of original flow
+ node.getValue().setFlowStartTime(flowResumeTime);
+ }
+
+ this.resumingDags.put(dagId, dag);
+ }
+
+ /**
+ * Finish resuming dags by first verifying the status is correct (flow
should be {@link ExecutionStatus#PENDING_RESUME}
+ * and jobs should not be {@link ExecutionStatus#FAILED} or {@link
ExecutionStatus#CANCELLED}) and then calling
+ * {@link #initialize}. This is separated from {@link #beginResumingDag}
because it could take some time for the
+ * job status state store to reflect the updated status.
+ */
+ private void finishResumingDags() throws IOException {
+ for (Map.Entry<String, Dag<JobExecutionPlan>> dag :
this.resumingDags.entrySet()) {
+ JobStatus flowStatus = pollFlowStatus(dag.getValue());
+ if (flowStatus == null ||
!flowStatus.getEventName().equals(PENDING_RESUME.name())) {
+ continue;
+ }
+
+ boolean dagReady = true;
+ for (DagNode<JobExecutionPlan> node : dag.getValue().getNodes()) {
+ JobStatus jobStatus = pollJobStatus(node);
+ if (jobStatus == null ||
jobStatus.getEventName().equals(FAILED.name()) ||
jobStatus.getEventName().equals(CANCELLED.name())) {
+ dagReady = false;
+ break;
+ }
+ }
+
+ if (dagReady) {
+ this.dagStateStore.writeCheckpoint(dag.getValue());
+ this.failedDagStateStore.cleanUp(dag.getValue());
+ this.failedDags.remove(dag.getKey());
+ this.resumingDags.remove(dag.getKey());
+ initialize(dag.getValue());
+ }
+ }
+ }
+
+ /**
* Cancels the dag and sends a cancellation tracking event.
* @param dagToCancel dag node to cancel
* @throws ExecutionException executionException
@@ -713,6 +814,25 @@ public class DagManager extends AbstractIdleService {
String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+ return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup,
jobName);
+ }
+
+ /**
+ * Retrieve the flow's {@link JobStatus} (i.e. job status with {@link
JobStatusRetriever#NA_KEY} as job name/group) from a dag
+ */
+ private JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag) {
+ if (dag == null || dag.isEmpty()) {
+ return null;
+ }
+ Config jobConfig =
dag.getNodes().get(0).getValue().getJobSpec().getConfig();
+ String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ long flowExecutionId =
jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+
+ return pollStatus(flowGroup, flowName, flowExecutionId,
JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY);
+ }
+
+ private JobStatus pollStatus(String flowGroup, String flowName, long
flowExecutionId, String jobGroup, String jobName) {
long pollStartTime = System.nanoTime();
Iterator<JobStatus> jobStatusIterator =
this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId, jobName, jobGroup);
@@ -992,6 +1112,7 @@ public class DagManager extends AbstractIdleService {
List<String> dagIdstoClean = new ArrayList<>();
//Clean up failed dags
for (String dagId : this.failedDagIdsFinishRunning) {
+ addFailedDag(dagId);
//Skip monitoring of any other jobs of the failed dag.
LinkedList<DagNode<JobExecutionPlan>> dagNodeList =
this.dagToJobs.get(dagId);
while (!dagNodeList.isEmpty()) {
@@ -1009,6 +1130,7 @@ public class DagManager extends AbstractIdleService {
if (!hasRunningJobs(dagId) &&
!this.failedDagIdsFinishRunning.contains(dagId)) {
String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
+ addFailedDag(dagId);
status = TimingEvent.FlowTimings.FLOW_FAILED;
this.failedDagIdsFinishAllPossible.remove(dagId);
}
@@ -1025,6 +1147,19 @@ public class DagManager extends AbstractIdleService {
}
/**
+ * Add a dag to failed dag state store
+ */
+ private synchronized void addFailedDag(String dagId) {
+ try {
+ log.info("Adding dag " + dagId + " to failed dag state store");
+ this.failedDagStateStore.writeCheckpoint(this.dags.get(dagId));
+ } catch (IOException e) {
+ log.error("Failed to add dag " + dagId + " to failed dag state store",
e);
+ }
+ this.failedDags.put(dagId, this.dags.get(dagId));
+ }
+
+ /**
* Note that removal of a {@link Dag} entry in {@link #dags} needs to be
happen after {@link #cleanUp()}
* since the real {@link Dag} object is required for {@link #cleanUp()},
* and cleaning of all relevant states need to be atomic
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 248fe4d..b55b0e1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -185,7 +185,8 @@ public class DagManagerUtils {
DagNode<JobExecutionPlan> node = nodesToExpand.poll();
ExecutionStatus executionStatus = getExecutionStatus(node);
boolean addFlag = true;
- if (executionStatus == ExecutionStatus.PENDING || executionStatus ==
ExecutionStatus.PENDING_RETRY) {
+ if (executionStatus == ExecutionStatus.PENDING || executionStatus ==
ExecutionStatus.PENDING_RETRY
+ || executionStatus == ExecutionStatus.PENDING_RESUME) {
//Add a node to be executed next, only if all of its parent nodes are
COMPLETE.
List<DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node);
for (DagNode<JobExecutionPlan> parentNode : parentNodes) {
@@ -243,12 +244,14 @@ public class DagManagerUtils {
}
/**
- * flow start time is assumed to be same the flow execution id which is
timestamp flow request was received
+ * Flow start time is the same as the flow execution id which is the
timestamp flow request was received, unless it
+ * is a resumed flow, in which case it is {@link
JobExecutionPlan#getFlowStartTime()}
* @param dagNode dag node in context
- * @return flow execution id
+ * @return flow start time
*/
static long getFlowStartTime(DagNode<JobExecutionPlan> dagNode) {
- return getFlowExecId(dagNode);
+ long flowStartTime = dagNode.getValue().getFlowStartTime();
+ return flowStartTime == 0L ? getFlowExecId(dagNode) : flowStartTime;
}
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index e6afe93..f790361 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -302,7 +302,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
if (this.dagManager.isPresent()) {
//Send the dag to the DagManager.
- this.dagManager.get().addDag(jobExecutionPlanDag, true);
+ this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
} else {
// Schedule all compiled JobSpecs on their respective Executor
for (Dag.DagNode<JobExecutionPlan> dagNode :
jobExecutionPlanDag.getNodes()) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
index 2a94fb0..0e3b8fe 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
@@ -37,6 +37,7 @@ import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.FlowStatusId;
import org.apache.gobblin.service.modules.utils.HelixUtils;
import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
/**
@@ -70,6 +71,18 @@ public class GobblinServiceFlowExecutionResourceHandler
implements FlowExecution
}
@Override
+ public UpdateResponse resume(ComplexResourceKey<FlowStatusId, EmptyRecord>
key) {
+ String flowGroup = key.getKey().getFlowGroup();
+ String flowName = key.getKey().getFlowName();
+ Long flowExecutionId = key.getKey().getFlowExecutionId();
+ if (this.forceLeader) {
+ HelixUtils.throwErrorIfNotLeader(this.helixManager);
+ }
+ this.eventBus.post(new ResumeFlowEvent(flowGroup, flowName,
flowExecutionId));
+ return new UpdateResponse(HttpStatus.S_200_OK);
+ }
+
+ @Override
public UpdateResponse delete(ComplexResourceKey<FlowStatusId, EmptyRecord>
key) {
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index f88b561..0cc4be3 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -57,7 +57,7 @@ import static
org.apache.gobblin.runtime.AbstractJobLauncher.GOBBLIN_JOB_TEMPLAT
* where the {@link JobSpec} will be executed.
*/
@Data
-@EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts",
"jobFuture"})
+@EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts",
"jobFuture", "flowStartTime"})
public class JobExecutionPlan {
public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
@@ -67,6 +67,7 @@ public class JobExecutionPlan {
private final int maxAttempts;
private int currentAttempts = 0;
private Optional<Future> jobFuture = Optional.absent();
+ private long flowStartTime = 0L;
public static class Factory {
public static final String JOB_NAME_COMPONENT_SEPARATION_CHAR = "_";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
index 1f988c2..ede2870 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
@@ -111,6 +111,11 @@ public class JobExecutionPlanListDeserializer implements
JsonDeserializer<List<J
JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(jobSpec,
specExecutor);
jobExecutionPlan.setExecutionStatus(executionStatus);
+ JsonElement flowStartTime =
serializedJobExecutionPlan.get(SerializationConstants.FLOW_START_TIME_KEY);
+ if (flowStartTime != null) {
+ jobExecutionPlan.setFlowStartTime(flowStartTime.getAsLong());
+ }
+
try {
String jobExecutionFuture =
serializedJobExecutionPlan.get(SerializationConstants.JOB_EXECUTION_FUTURE).getAsString();
Future future =
specExecutor.getProducer().get().deserializeAddSpecResponse(jobExecutionFuture);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
index 5a38c19..3d2fe36 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListSerializer.java
@@ -83,6 +83,8 @@ public class JobExecutionPlanListSerializer implements
JsonSerializer<List<JobEx
String executionStatus = jobExecutionPlan.getExecutionStatus().name();
jobExecutionPlanJson.addProperty(SerializationConstants.EXECUTION_STATUS_KEY,
executionStatus);
+
jobExecutionPlanJson.addProperty(SerializationConstants.FLOW_START_TIME_KEY,
jobExecutionPlan.getFlowStartTime());
+
try {
String jobExecutionFuture =
jobExecutionPlan.getSpecExecutor().getProducer().get()
.serializeAddSpecResponse(jobExecutionPlan.getJobFuture().orNull());
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
index 5359465..7cccf9f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/SerializationConstants.java
@@ -32,4 +32,5 @@ public class SerializationConstants {
public static final String EXECUTION_STATUS_KEY = "executionStatus";
public static final String JOB_EXECUTION_FUTURE = "jobExecutionFuture";
+ public static final String FLOW_START_TIME_KEY = "flowStartTime";
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index 3ea701c..def4943 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -126,6 +126,10 @@ public class KafkaAvroJobStatusMonitor extends
KafkaJobStatusMonitor {
case TimingEvent.LauncherTimings.JOB_PENDING:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.PENDING.name());
break;
+ case TimingEvent.FlowTimings.FLOW_PENDING_RESUME:
+ case TimingEvent.LauncherTimings.JOB_PENDING_RESUME:
+ properties.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.PENDING_RESUME.name());
+ break;
case TimingEvent.LauncherTimings.JOB_ORCHESTRATED:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.ORCHESTRATED.name());
properties.put(TimingEvent.JOB_ORCHESTRATED_TIME,
properties.getProperty(TimingEvent.METADATA_END_TIME));
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 68f48c9..abd15aa 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -84,7 +84,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
KAFKA_AUTO_OFFSET_RESET_KEY, KAFKA_AUTO_OFFSET_RESET_SMALLEST));
private static final List<ExecutionStatus> ORDERED_EXECUTION_STATUSES =
ImmutableList
- .of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING,
ExecutionStatus.PENDING_RETRY,
+ .of(ExecutionStatus.COMPILED, ExecutionStatus.PENDING,
ExecutionStatus.PENDING_RESUME, ExecutionStatus.PENDING_RETRY,
ExecutionStatus.ORCHESTRATED, ExecutionStatus.RUNNING,
ExecutionStatus.COMPLETE,
ExecutionStatus.FAILED, ExecutionStatus.CANCELLED);
@@ -173,8 +173,9 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
String previousStatus = states.get(states.size() -
1).getProp(JobStatusRetriever.EVENT_NAME_FIELD);
String currentStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
- if (previousStatus != null && currentStatus != null &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus))
- <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))) {
+ // PENDING_RESUME is allowed to override, because it happens when a flow
is being resumed from previously being failed
+ if (previousStatus != null && currentStatus != null &&
!currentStatus.equals(ExecutionStatus.PENDING_RESUME.name())
+ &&
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(currentStatus)) <
ORDERED_EXECUTION_STATUSES.indexOf(ExecutionStatus.valueOf(previousStatus))) {
log.warn(String.format("Received status %s when status is already %s
for flow (%s, %s, %s), job (%s, %s)",
currentStatus, previousStatus, flowGroup, flowName,
flowExecutionId, jobGroup, jobName));
jobStatus = mergeState(states.get(states.size() - 1), jobStatus);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index ecd90fa..f9b8bd9 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -94,9 +94,9 @@ public class DagManagerFlowTest {
.thenReturn(Collections.singletonList(flowExecutionId3));
// mock add spec
- dagManager.addDag(dag1, true);
- dagManager.addDag(dag2, true);
- dagManager.addDag(dag3, true);
+ dagManager.addDag(dag1, true, true);
+ dagManager.addDag(dag2, true, true);
+ dagManager.addDag(dag3, true, true);
// check existence of dag in dagToJobs map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -152,7 +152,7 @@ public class DagManagerFlowTest {
.thenReturn(Collections.singletonList(flowExecutionId));
// mock add spec
- dagManager.addDag(dag, true);
+ dagManager.addDag(dag, true, true);
// check existence of dag in dagToJobs map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -196,7 +196,7 @@ public class DagManagerFlowTest {
dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
// mock add spec
- dagManager.addDag(dag, true);
+ dagManager.addDag(dag, true, true);
// check existence of dag in dagToSLA map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
@@ -233,7 +233,7 @@ public class DagManagerFlowTest {
dag.getStartNodes().get(0).getValue().getJobSpec().setConfig(jobConfig);
// mock add spec
- dagManager.addDag(dag, true);
+ dagManager.addDag(dag, true, true);
// check existence of dag in dagToSLA map
AssertWithBackoff.create().maxSleepMs(5000).backoffFactor(1).
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 60f25d1..a39457f 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -63,9 +63,11 @@ public class DagManagerTest {
private DagManager.DagManagerThread _dagManagerThread;
private LinkedBlockingQueue<Dag<JobExecutionPlan>> queue;
private LinkedBlockingQueue<String> cancelQueue;
+ private LinkedBlockingQueue<String> resumeQueue;
private Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag;
private Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs;
private Map<String, Dag<JobExecutionPlan>> dags;
+ private Map<String, Dag<JobExecutionPlan>> failedDags;
@BeforeClass
public void setUp() throws Exception {
@@ -77,8 +79,9 @@ public class DagManagerTest {
this._jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
this.queue = new LinkedBlockingQueue<>();
this.cancelQueue = new LinkedBlockingQueue<>();
- this._dagManagerThread = new
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, queue,
cancelQueue,
- true, 5, new HashMap<>());
+ this.resumeQueue = new LinkedBlockingQueue<>();
+ this._dagManagerThread = new
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore,
_dagStateStore, queue, cancelQueue,
+ resumeQueue, true, 5, new HashMap<>());
Field jobToDagField =
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
jobToDagField.setAccessible(true);
@@ -92,6 +95,9 @@ public class DagManagerTest {
dagsField.setAccessible(true);
this.dags = (Map<String, Dag<JobExecutionPlan>>)
dagsField.get(this._dagManagerThread);
+ Field failedDagsField =
DagManager.DagManagerThread.class.getDeclaredField("failedDags");
+ failedDagsField.setAccessible(true);
+ this.failedDags = (Map<String, Dag<JobExecutionPlan>>)
failedDagsField.get(this._dagManagerThread);
}
/**
@@ -327,6 +333,82 @@ public class DagManagerTest {
}
@Test (dependsOnMethods = "testFailedDag")
+ public void testResumeDag() throws URISyntaxException, IOException {
+ long flowExecutionId = System.currentTimeMillis();
+ String flowGroupId = "0";
+ String flowGroup = "group" + flowGroupId;
+ String flowName = "flow" + flowGroupId;
+ String jobName0 = "job0";
+ String jobName1 = "job1";
+ String jobName2 = "job2";
+
+ Dag<JobExecutionPlan> dag = buildDag(flowGroupId, flowExecutionId,
"FINISH_RUNNING", true);
+ String dagId = DagManagerUtils.generateDagId(dag);
+
+ //Add a dag to the queue of dags
+ this.queue.offer(dag);
+ Iterator<JobStatus> jobStatusIterator1 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0,
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator2 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator3 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1,
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator4 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator5 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator6 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.FAILED));
+ Iterator<JobStatus> jobStatusIterator7 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, "NA_KEY",
"NA_KEY", String.valueOf(ExecutionStatus.PENDING_RESUME));
+ Iterator<JobStatus> jobStatusIterator8 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator9 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName1,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+ Iterator<JobStatus> jobStatusIterator10 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.PENDING_RESUME));
+ Iterator<JobStatus> jobStatusIterator11 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator12 =
+ getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName2,
flowGroup, String.valueOf(ExecutionStatus.COMPLETE));
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.anyString(),
Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(),
Mockito.anyString())).
+ thenReturn(jobStatusIterator1).
+ thenReturn(jobStatusIterator2).
+ thenReturn(jobStatusIterator3).
+ thenReturn(jobStatusIterator4).
+ thenReturn(jobStatusIterator5).
+ thenReturn(jobStatusIterator6).
+ thenReturn(jobStatusIterator7).
+ thenReturn(jobStatusIterator8).
+ thenReturn(jobStatusIterator9).
+ thenReturn(jobStatusIterator10).
+ thenReturn(jobStatusIterator11).
+ thenReturn(jobStatusIterator12);
+
+ // Run thread until job2 fails
+ for (int i = 0; i < 4; i++) {
+ this._dagManagerThread.run();
+ }
+
+ Assert.assertTrue(this.failedDags.containsKey(dagId));
+
+ // Resume dag
+ this.resumeQueue.offer(dagId);
+
+ // Job2 rerunning
+ this._dagManagerThread.run();
+ Assert.assertFalse(this.failedDags.containsKey(dagId));
+ Assert.assertTrue(this.dags.containsKey(dagId));
+
+ // Job2 complete
+ this._dagManagerThread.run();
+ Assert.assertFalse(this.failedDags.containsKey(dagId));
+ Assert.assertFalse(this.dags.containsKey(dagId));
+ }
+
+ @Test (dependsOnMethods = "testResumeDag")
public void testSucceedAfterRetry() throws Exception {
long flowExecutionId = System.currentTimeMillis();
String flowGroupId = "0";