This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 01136a5 [GOBBLIN-1370] Switch flow resume to be a restli action
01136a5 is described below
commit 01136a5b3a40eca85cf564a99c63a1ed98e2671d
Author: Jack Moseley <[email protected]>
AuthorDate: Tue Jan 26 14:57:17 2021 -0800
[GOBBLIN-1370] Switch flow resume to be a restli action
Closes #3211 from jack-moseley/resume-action
---
...he.gobblin.service.flowexecutions.restspec.json | 11 +++++----
...he.gobblin.service.flowexecutions.snapshot.json | 11 +++++----
.../gobblin/service/FlowExecutionClient.java | 20 ++++------------
.../gobblin/service/FlowExecutionResource.java | 28 +++++++---------------
.../service/FlowExecutionResourceHandler.java | 11 +--------
.../service/FlowExecutionResourceLocalHandler.java | 2 +-
.../service/modules/orchestration/DagManager.java | 4 ++--
...GobblinServiceFlowExecutionResourceHandler.java | 3 +--
8 files changed, 31 insertions(+), 59 deletions(-)
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
index 4af7b37..67427d2 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
@@ -10,14 +10,11 @@
"type" : "org.apache.gobblin.service.FlowStatusId",
"params" : "com.linkedin.restli.common.EmptyRecord"
},
- "supports" : [ "delete", "get", "partial_update" ],
+ "supports" : [ "delete", "get" ],
"methods" : [ {
"method" : "get",
"doc" : "Retrieve the FlowExecution with the given key"
}, {
- "method" : "partial_update",
- "doc" : "Resume a failed {@link FlowExecution} from the point before
failure. This is specified by a partial update patch which\n sets
executionStatus to RUNNING."
- }, {
"method" : "delete",
"doc" : "Kill the FlowExecution with the given key"
} ],
@@ -41,7 +38,11 @@
} ]
} ],
"entity" : {
- "path" : "/flowexecutions/{id}"
+ "path" : "/flowexecutions/{id}",
+ "actions" : [ {
+ "name" : "resume",
+ "doc" : "Resume a failed {@link FlowExecution} from the point before
failure."
+ } ]
}
}
}
\ No newline at end of file
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
index 97d67f8..734c2ac 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
@@ -216,14 +216,11 @@
"type" : "org.apache.gobblin.service.FlowStatusId",
"params" : "com.linkedin.restli.common.EmptyRecord"
},
- "supports" : [ "delete", "get", "partial_update" ],
+ "supports" : [ "delete", "get" ],
"methods" : [ {
"method" : "get",
"doc" : "Retrieve the FlowExecution with the given key"
}, {
- "method" : "partial_update",
- "doc" : "Resume a failed {@link FlowExecution} from the point before
failure. This is specified by a partial update patch which\n sets
executionStatus to RUNNING."
- }, {
"method" : "delete",
"doc" : "Kill the FlowExecution with the given key"
} ],
@@ -247,7 +244,11 @@
} ]
} ],
"entity" : {
- "path" : "/flowexecutions/{id}"
+ "path" : "/flowexecutions/{id}",
+ "actions" : [ {
+ "name" : "resume",
+ "doc" : "Resume a failed {@link FlowExecution} from the point before
failure."
+ } ]
}
}
}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
index 8398c54..cb9b1b3 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
@@ -17,14 +17,11 @@
package org.apache.gobblin.service;
-import com.linkedin.data.DataMap;
-import com.linkedin.restli.client.DeleteRequest;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,17 +33,15 @@ import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.r2.transport.common.Client;
import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
import com.linkedin.r2.transport.http.client.HttpClientFactory;
+import com.linkedin.restli.client.ActionRequest;
+import com.linkedin.restli.client.DeleteRequest;
import com.linkedin.restli.client.FindRequest;
import com.linkedin.restli.client.GetRequest;
-import com.linkedin.restli.client.PartialUpdateRequest;
import com.linkedin.restli.client.Response;
import com.linkedin.restli.client.RestClient;
-import com.linkedin.restli.client.UpdateRequest;
import com.linkedin.restli.common.CollectionResponse;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
-import com.linkedin.restli.common.PatchRequest;
-import com.linkedin.restli.internal.server.util.DataMapUtils;
/**
@@ -177,15 +172,10 @@ public class FlowExecutionClient implements Closeable {
LOG.debug("resumeFlowExecution with groupName " +
flowStatusId.getFlowGroup() + " flowName " +
flowStatusId.getFlowName() + " flowExecutionId " +
flowStatusId.getFlowExecutionId());
- String patchJson = "{\"$set\":{\"executionStatus\":\"RUNNING\"}}";
- DataMap dataMap = DataMapUtils.readMap(IOUtils.toInputStream(patchJson));
- PatchRequest<FlowExecution> flowExecutionPatch =
PatchRequest.createFromPatchDocument(dataMap);
-
- PartialUpdateRequest<FlowExecution> partialUpdateRequest =
- _flowexecutionsRequestBuilders.partialUpdate().id(new
ComplexResourceKey<>(flowStatusId, new EmptyRecord()))
- .input(flowExecutionPatch).build();
+ ActionRequest<Void> resumeRequest =
_flowexecutionsRequestBuilders.actionResume()
+ .id(new ComplexResourceKey<>(flowStatusId, new EmptyRecord())).build();
- FlowClientUtils.sendRequestWithRetry(_restClient.get(),
partialUpdateRequest, FlowexecutionsRequestBuilders.getPrimaryResource());
+ FlowClientUtils.sendRequestWithRetry(_restClient.get(), resumeRequest,
FlowexecutionsRequestBuilders.getPrimaryResource());
}
/**
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index 475482a..9b6c7cf 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -20,15 +20,17 @@ package org.apache.gobblin.service;
import java.util.List;
import com.google.inject.Inject;
-import com.linkedin.data.DataMap;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
-import com.linkedin.restli.common.PatchRequest;
import com.linkedin.restli.server.PagingContext;
+import com.linkedin.restli.server.PathKeys;
+import com.linkedin.restli.server.ResourceLevel;
import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.annotations.Action;
import com.linkedin.restli.server.annotations.Context;
import com.linkedin.restli.server.annotations.Finder;
import com.linkedin.restli.server.annotations.Optional;
+import com.linkedin.restli.server.annotations.PathKeysParam;
import com.linkedin.restli.server.annotations.QueryParam;
import com.linkedin.restli.server.annotations.RestLiCollection;
import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
@@ -63,24 +65,12 @@ public class FlowExecutionResource extends
ComplexKeyResourceTemplate<FlowStatus
}
/**
- * Resume a failed {@link FlowExecution} from the point before failure. This
is specified by a partial update patch which
- * sets executionStatus to RUNNING.
- * @param key {@link FlowStatusId} of flow to resume
- * @param flowExecutionPatch {@link PatchRequest} which is expected to set
executionStatus to RUNNING
- * @return {@link UpdateResponse}
+ * Resume a failed {@link FlowExecution} from the point before failure.
+ * @param pathKeys key of {@link FlowExecution} specified in path
*/
- @Override
- public UpdateResponse update(ComplexResourceKey<FlowStatusId, EmptyRecord>
key, PatchRequest<FlowExecution> flowExecutionPatch) {
- DataMap dataMap = flowExecutionPatch.getPatchDocument().getDataMap("$set");
- if (dataMap != null) {
- String status = dataMap.getString("executionStatus");
- if (status != null &&
status.equalsIgnoreCase(ExecutionStatus.RUNNING.name())) {
- return this.flowExecutionResourceHandler.resume(key);
- }
- }
-
- throw new UnsupportedOperationException("Only flow resume is supported for
FlowExecution update, which is specified by "
- + "setting executionStatus field to RUNNING");
+ @Action(name="resume",resourceLevel= ResourceLevel.ENTITY)
+ public void resume(@PathKeysParam PathKeys pathKeys) {
+ this.flowExecutionResourceHandler.resume(pathKeys.get("id"));
}
/**
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
index e79eb5f..83737c4 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
@@ -17,21 +17,12 @@
package org.apache.gobblin.service;
-import java.util.Collection;
import java.util.List;
-import java.util.Properties;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
-import com.linkedin.restli.common.PatchRequest;
-import com.linkedin.restli.server.CreateResponse;
import com.linkedin.restli.server.PagingContext;
import com.linkedin.restli.server.UpdateResponse;
-import com.linkedin.restli.server.annotations.Context;
-import com.linkedin.restli.server.annotations.Optional;
-import com.linkedin.restli.server.annotations.QueryParam;
-
-import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
public interface FlowExecutionResourceHandler {
@@ -48,7 +39,7 @@ public interface FlowExecutionResourceHandler {
/**
* Resume a failed {@link FlowExecution} from the point before failure
*/
- public UpdateResponse resume(ComplexResourceKey<FlowStatusId, EmptyRecord>
key);
+ public void resume(ComplexResourceKey<FlowStatusId, EmptyRecord> key);
/**
* Kill a running {@link FlowExecution}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
index d83f70d..b6d30b8 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
@@ -69,7 +69,7 @@ public class FlowExecutionResourceLocalHandler implements
FlowExecutionResourceH
}
@Override
- public UpdateResponse resume(ComplexResourceKey<FlowStatusId, EmptyRecord>
key) {
+ public void resume(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
throw new UnsupportedOperationException("Resume should be handled in
GobblinServiceFlowConfigResourceHandler");
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index a925450..bd22f74 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -520,9 +520,9 @@ public class DagManager extends AbstractIdleService {
ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
if (executionStatus.equals(FAILED) ||
executionStatus.equals(CANCELLED)) {
node.getValue().setExecutionStatus(PENDING_RESUME);
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
+
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
}
- Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
-
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
// Set flowStartTime so that flow SLA will be based on current time
instead of original flow
node.getValue().setFlowStartTime(flowResumeTime);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
index 0e3b8fe..538fc57 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
@@ -71,7 +71,7 @@ public class GobblinServiceFlowExecutionResourceHandler
implements FlowExecution
}
@Override
- public UpdateResponse resume(ComplexResourceKey<FlowStatusId, EmptyRecord>
key) {
+ public void resume(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
Long flowExecutionId = key.getKey().getFlowExecutionId();
@@ -79,7 +79,6 @@ public class GobblinServiceFlowExecutionResourceHandler
implements FlowExecution
HelixUtils.throwErrorIfNotLeader(this.helixManager);
}
this.eventBus.post(new ResumeFlowEvent(flowGroup, flowName,
flowExecutionId));
- return new UpdateResponse(HttpStatus.S_200_OK);
}
@Override