This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 01136a5  [GOBBLIN-1370] Switch flow resume to be a restli action
01136a5 is described below

commit 01136a5b3a40eca85cf564a99c63a1ed98e2671d
Author: Jack Moseley <[email protected]>
AuthorDate: Tue Jan 26 14:57:17 2021 -0800

    [GOBBLIN-1370] Switch flow resume to be a restli action
    
    Closes #3211 from jack-moseley/resume-action
---
 ...he.gobblin.service.flowexecutions.restspec.json | 11 +++++----
 ...he.gobblin.service.flowexecutions.snapshot.json | 11 +++++----
 .../gobblin/service/FlowExecutionClient.java       | 20 ++++------------
 .../gobblin/service/FlowExecutionResource.java     | 28 +++++++---------------
 .../service/FlowExecutionResourceHandler.java      | 11 +--------
 .../service/FlowExecutionResourceLocalHandler.java |  2 +-
 .../service/modules/orchestration/DagManager.java  |  4 ++--
 ...GobblinServiceFlowExecutionResourceHandler.java |  3 +--
 8 files changed, 31 insertions(+), 59 deletions(-)

diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
index 4af7b37..67427d2 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowexecutions.restspec.json
@@ -10,14 +10,11 @@
       "type" : "org.apache.gobblin.service.FlowStatusId",
       "params" : "com.linkedin.restli.common.EmptyRecord"
     },
-    "supports" : [ "delete", "get", "partial_update" ],
+    "supports" : [ "delete", "get" ],
     "methods" : [ {
       "method" : "get",
       "doc" : "Retrieve the FlowExecution with the given key"
     }, {
-      "method" : "partial_update",
-      "doc" : "Resume a failed {@link FlowExecution} from the point before 
failure. This is specified by a partial update patch which\n sets 
executionStatus to RUNNING."
-    }, {
       "method" : "delete",
       "doc" : "Kill the FlowExecution with the given key"
     } ],
@@ -41,7 +38,11 @@
       } ]
     } ],
     "entity" : {
-      "path" : "/flowexecutions/{id}"
+      "path" : "/flowexecutions/{id}",
+      "actions" : [ {
+        "name" : "resume",
+        "doc" : "Resume a failed {@link FlowExecution} from the point before 
failure."
+      } ]
     }
   }
 }
\ No newline at end of file
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
index 97d67f8..734c2ac 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowexecutions.snapshot.json
@@ -216,14 +216,11 @@
         "type" : "org.apache.gobblin.service.FlowStatusId",
         "params" : "com.linkedin.restli.common.EmptyRecord"
       },
-      "supports" : [ "delete", "get", "partial_update" ],
+      "supports" : [ "delete", "get" ],
       "methods" : [ {
         "method" : "get",
         "doc" : "Retrieve the FlowExecution with the given key"
       }, {
-        "method" : "partial_update",
-        "doc" : "Resume a failed {@link FlowExecution} from the point before 
failure. This is specified by a partial update patch which\n sets 
executionStatus to RUNNING."
-      }, {
         "method" : "delete",
         "doc" : "Kill the FlowExecution with the given key"
       } ],
@@ -247,7 +244,11 @@
         } ]
       } ],
       "entity" : {
-        "path" : "/flowexecutions/{id}"
+        "path" : "/flowexecutions/{id}",
+        "actions" : [ {
+          "name" : "resume",
+          "doc" : "Resume a failed {@link FlowExecution} from the point before 
failure."
+        } ]
       }
     }
   }
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
index 8398c54..cb9b1b3 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowExecutionClient.java
@@ -17,14 +17,11 @@
 
 package org.apache.gobblin.service;
 
-import com.linkedin.data.DataMap;
-import com.linkedin.restli.client.DeleteRequest;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,17 +33,15 @@ import com.linkedin.r2.RemoteInvocationException;
 import com.linkedin.r2.transport.common.Client;
 import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
 import com.linkedin.r2.transport.http.client.HttpClientFactory;
+import com.linkedin.restli.client.ActionRequest;
+import com.linkedin.restli.client.DeleteRequest;
 import com.linkedin.restli.client.FindRequest;
 import com.linkedin.restli.client.GetRequest;
-import com.linkedin.restli.client.PartialUpdateRequest;
 import com.linkedin.restli.client.Response;
 import com.linkedin.restli.client.RestClient;
-import com.linkedin.restli.client.UpdateRequest;
 import com.linkedin.restli.common.CollectionResponse;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
-import com.linkedin.restli.common.PatchRequest;
-import com.linkedin.restli.internal.server.util.DataMapUtils;
 
 
 /**
@@ -177,15 +172,10 @@ public class FlowExecutionClient implements Closeable {
     LOG.debug("resumeFlowExecution with groupName " + 
flowStatusId.getFlowGroup() + " flowName " +
         flowStatusId.getFlowName() + " flowExecutionId " + 
flowStatusId.getFlowExecutionId());
 
-    String patchJson = "{\"$set\":{\"executionStatus\":\"RUNNING\"}}";
-    DataMap dataMap = DataMapUtils.readMap(IOUtils.toInputStream(patchJson));
-    PatchRequest<FlowExecution> flowExecutionPatch = 
PatchRequest.createFromPatchDocument(dataMap);
-
-    PartialUpdateRequest<FlowExecution> partialUpdateRequest =
-        _flowexecutionsRequestBuilders.partialUpdate().id(new 
ComplexResourceKey<>(flowStatusId, new EmptyRecord()))
-            .input(flowExecutionPatch).build();
+    ActionRequest<Void> resumeRequest = 
_flowexecutionsRequestBuilders.actionResume()
+        .id(new ComplexResourceKey<>(flowStatusId, new EmptyRecord())).build();
 
-    FlowClientUtils.sendRequestWithRetry(_restClient.get(), 
partialUpdateRequest, FlowexecutionsRequestBuilders.getPrimaryResource());
+    FlowClientUtils.sendRequestWithRetry(_restClient.get(), resumeRequest, 
FlowexecutionsRequestBuilders.getPrimaryResource());
   }
 
   /**
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index 475482a..9b6c7cf 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -20,15 +20,17 @@ package org.apache.gobblin.service;
 import java.util.List;
 
 import com.google.inject.Inject;
-import com.linkedin.data.DataMap;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
-import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.server.PagingContext;
+import com.linkedin.restli.server.PathKeys;
+import com.linkedin.restli.server.ResourceLevel;
 import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.annotations.Action;
 import com.linkedin.restli.server.annotations.Context;
 import com.linkedin.restli.server.annotations.Finder;
 import com.linkedin.restli.server.annotations.Optional;
+import com.linkedin.restli.server.annotations.PathKeysParam;
 import com.linkedin.restli.server.annotations.QueryParam;
 import com.linkedin.restli.server.annotations.RestLiCollection;
 import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
@@ -63,24 +65,12 @@ public class FlowExecutionResource extends 
ComplexKeyResourceTemplate<FlowStatus
   }
 
   /**
-   * Resume a failed {@link FlowExecution} from the point before failure. This 
is specified by a partial update patch which
-   * sets executionStatus to RUNNING.
-   * @param key {@link FlowStatusId} of flow to resume
-   * @param flowExecutionPatch {@link PatchRequest} which is expected to set 
executionStatus to RUNNING
-   * @return {@link UpdateResponse}
+   * Resume a failed {@link FlowExecution} from the point before failure.
+   * @param pathKeys key of {@link FlowExecution} specified in path
    */
-  @Override
-  public UpdateResponse update(ComplexResourceKey<FlowStatusId, EmptyRecord> 
key, PatchRequest<FlowExecution> flowExecutionPatch) {
-    DataMap dataMap = flowExecutionPatch.getPatchDocument().getDataMap("$set");
-    if (dataMap != null) {
-      String status = dataMap.getString("executionStatus");
-      if (status != null && 
status.equalsIgnoreCase(ExecutionStatus.RUNNING.name())) {
-        return this.flowExecutionResourceHandler.resume(key);
-      }
-    }
-
-    throw new UnsupportedOperationException("Only flow resume is supported for 
FlowExecution update, which is specified by "
-        + "setting executionStatus field to RUNNING");
+  @Action(name="resume",resourceLevel= ResourceLevel.ENTITY)
+  public void resume(@PathKeysParam PathKeys pathKeys) {
+    this.flowExecutionResourceHandler.resume(pathKeys.get("id"));
   }
 
   /**
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
index e79eb5f..83737c4 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceHandler.java
@@ -17,21 +17,12 @@
 
 package org.apache.gobblin.service;
 
-import java.util.Collection;
 import java.util.List;
-import java.util.Properties;
 
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
-import com.linkedin.restli.common.PatchRequest;
-import com.linkedin.restli.server.CreateResponse;
 import com.linkedin.restli.server.PagingContext;
 import com.linkedin.restli.server.UpdateResponse;
-import com.linkedin.restli.server.annotations.Context;
-import com.linkedin.restli.server.annotations.Optional;
-import com.linkedin.restli.server.annotations.QueryParam;
-
-import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
 
 
 public interface FlowExecutionResourceHandler {
@@ -48,7 +39,7 @@ public interface FlowExecutionResourceHandler {
   /**
    * Resume a failed {@link FlowExecution} from the point before failure
    */
-  public UpdateResponse resume(ComplexResourceKey<FlowStatusId, EmptyRecord> 
key);
+  public void resume(ComplexResourceKey<FlowStatusId, EmptyRecord> key);
 
   /**
    * Kill a running {@link FlowExecution}
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
index d83f70d..b6d30b8 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
@@ -69,7 +69,7 @@ public class FlowExecutionResourceLocalHandler implements 
FlowExecutionResourceH
   }
 
   @Override
-  public UpdateResponse resume(ComplexResourceKey<FlowStatusId, EmptyRecord> 
key) {
+  public void resume(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
     throw new UnsupportedOperationException("Resume should be handled in 
GobblinServiceFlowConfigResourceHandler");
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index a925450..bd22f74 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -520,9 +520,9 @@ public class DagManager extends AbstractIdleService {
         ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
         if (executionStatus.equals(FAILED) || 
executionStatus.equals(CANCELLED)) {
           node.getValue().setExecutionStatus(PENDING_RESUME);
+          Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
+          
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
         }
-        Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
-        
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
 
         // Set flowStartTime so that flow SLA will be based on current time 
instead of original flow
         node.getValue().setFlowStartTime(flowResumeTime);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
index 0e3b8fe..538fc57 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
@@ -71,7 +71,7 @@ public class GobblinServiceFlowExecutionResourceHandler 
implements FlowExecution
   }
 
   @Override
-  public UpdateResponse resume(ComplexResourceKey<FlowStatusId, EmptyRecord> 
key) {
+  public void resume(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
     String flowGroup = key.getKey().getFlowGroup();
     String flowName = key.getKey().getFlowName();
     Long flowExecutionId = key.getKey().getFlowExecutionId();
@@ -79,7 +79,6 @@ public class GobblinServiceFlowExecutionResourceHandler 
implements FlowExecution
       HelixUtils.throwErrorIfNotLeader(this.helixManager);
     }
     this.eventBus.post(new ResumeFlowEvent(flowGroup, flowName, 
flowExecutionId));
-    return new UpdateResponse(HttpStatus.S_200_OK);
   }
 
   @Override

Reply via email to