This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 70e480fe7 Consistly handle Rest.li /flowexecutions KILL and RESUME 
actions (#3830)
70e480fe7 is described below

commit 70e480fe74ff37f6d7cdacbde8945095a55169ff
Author: Kip Kohn <[email protected]>
AuthorDate: Mon Nov 20 14:22:30 2023 -0800

    Consistly handle Rest.li /flowexecutions KILL and RESUME actions (#3830)
---
 ...lowExecutionResourceHandlerWithWarmStandby.java | 60 ++++++++--------------
 1 file changed, 22 insertions(+), 38 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
index 0b5d1cdc7..f391e8fe7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -29,9 +29,11 @@ import java.io.IOException;
 import java.sql.SQLException;
 import javax.inject.Named;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
 import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.util.InjectionNames;
 import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
+import org.apache.gobblin.service.FlowStatusId;
 import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.helix.HelixManager;
 
@@ -46,56 +48,38 @@ public class 
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends G
     this.dagActionStore = dagActionStore;
   }
 
+  @Override
+  public void resume(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
+    FlowStatusId id = key.getKey();
+    addDagAction(id.getFlowGroup(), id.getFlowName(), id.getFlowExecutionId(), 
DagActionStore.FlowActionType.RESUME);
+  }
 
   @Override
-  public void 
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, EmptyRecord> 
key) {
-    String flowGroup = key.getKey().getFlowGroup();
-    String flowName = key.getKey().getFlowName();
-    Long flowExecutionId = key.getKey().getFlowExecutionId();
+  public UpdateResponse 
delete(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, EmptyRecord> 
key) {
+    FlowStatusId id = key.getKey();
+    addDagAction(id.getFlowGroup(), id.getFlowName(), id.getFlowExecutionId(), 
DagActionStore.FlowActionType.KILL);
+    return new UpdateResponse(HttpStatus.S_200_OK);
+  }
+
+  /** NOTE: may throw {@link RestLiServiceException}, see: 
https://linkedin.github.io/rest.li/user_guide/restli_server#returning-errors */
+  protected void addDagAction(String flowGroup, String flowName, Long 
flowExecutionId, DagActionStore.FlowActionType actionType) {
     try {
       // If an existing resume request is still pending then do not accept 
this request
-      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
-        this.prepareError("There is already a pending RESUME action for this 
flow. Please wait to resubmit and wait "
+      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString(), actionType)) {
+        this.throwErrorResponse("There is already a pending " + actionType + " 
action for this flow. Please wait to resubmit and wait "
             + "for action to be completed.", HttpStatus.S_409_CONFLICT);
         return;
       }
-      this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME);
+      this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), actionType);
     } catch (IOException | SQLException e) {
       log.warn(
-          String.format("Failed to add execution resume action for flow %s %s 
%s to dag action store due to", flowGroup,
+          String.format("Failed to add %s action for flow %s %s %s to dag 
action store due to:", actionType, flowGroup,
               flowName, flowExecutionId), e);
-      this.prepareError(e.getMessage(), 
HttpStatus.S_500_INTERNAL_SERVER_ERROR);
-    }
-
-  }
-
-  private void prepareError(String exceptionMessage, HttpStatus errorType) {
-    if (errorType == HttpStatus.S_409_CONFLICT) {
-      throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, 
exceptionMessage);
-    } else if (errorType == HttpStatus.S_400_BAD_REQUEST) {
-      throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST);
+      this.throwErrorResponse(e.getMessage(), 
HttpStatus.S_500_INTERNAL_SERVER_ERROR);
     }
-    throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, 
exceptionMessage);
   }
 
-  @Override
-  public UpdateResponse 
delete(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, EmptyRecord> 
key) {
-
-    String flowGroup = key.getKey().getFlowGroup();
-    String flowName = key.getKey().getFlowName();
-    Long flowExecutionId = key.getKey().getFlowExecutionId();
-    try {
-      // If an existing kill request is still pending then do not accept this 
request
-      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.KILL)) {
-        this.prepareError("There is already a pending KILL action for this 
flow. Please wait to resubmit and wait "
-            + "for action to be completed.", HttpStatus.S_400_BAD_REQUEST);
-      }
-      this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.KILL);
-      return new UpdateResponse(HttpStatus.S_200_OK);
-    } catch (IOException | SQLException e) {
-      this.prepareError(String.format("Failed to add execution delete action 
for flow %s %s %s to dag action store due to", flowGroup,
-          flowName, flowExecutionId), HttpStatus.S_500_INTERNAL_SERVER_ERROR);
-      return new UpdateResponse(HttpStatus.S_500_INTERNAL_SERVER_ERROR);
-    }
+  private void throwErrorResponse(String exceptionMessage, HttpStatus 
errorType) {
+    throw StringUtils.isBlank(exceptionMessage) ? new 
RestLiServiceException(errorType) : new RestLiServiceException(errorType, 
exceptionMessage);
   }
 }

Reply via email to