phet commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1222198550


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -54,27 +53,26 @@ public void 
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E
     String flowName = key.getKey().getFlowName();
     Long flowExecutionId = key.getKey().getFlowExecutionId();
     try {
-      // If an existing resume or kill request is still pending then do not 
accept this request
-      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString())) {
-        DagActionStore.DagActionValue action = 
this.dagActionStore.getDagAction(flowGroup, flowName, 
flowExecutionId.toString()).getDagActionValue();
-        this.handleException(flowGroup, flowName, flowExecutionId.toString(),
-            new RuntimeException("There is already a pending action " + action 
+ " for this flow. Please wait to resubmit and wait for"
+      // If an existing resume request is still pending then do not accept 
this request
+      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
+        this.handleException(flowGroup, flowName, flowExecutionId.toString(), 
DagActionStore.FlowActionType.RESUME,
+            new RuntimeException("There is already a pending RESUME action for 
this flow. Please wait to resubmit and wait for"
                 + " action to be completed."));
         return;
       }
-      this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME);
-    } catch (IOException | SQLException | SpecNotFoundException e) {
+      this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME);
+    } catch (IOException | SQLException e) {
       log.warn(
           String.format("Failed to add execution resume action for flow %s %s 
%s to dag action store due to", flowGroup,
               flowName, flowExecutionId), e);
-      this.handleException(flowGroup, flowName, flowExecutionId.toString(), e);
+      this.handleException(flowGroup, flowName, flowExecutionId.toString(), 
DagActionStore.FlowActionType.RESUME, e);
     }
 
   }
 
-  private void handleException (String flowGroup, String flowName, String 
flowExecutionId, Exception e) {
+  private void handleException (String flowGroup, String flowName, String 
flowExecutionId, DagActionStore.FlowActionType flowActionType, Exception e) {
     try {
-      if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId)) {
+      if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId, 
flowActionType)) {

Review Comment:
   having a separate, subsequent, and repeated check of `exists` looks like a 
race condition.  instead whoever calls `handleException` should indicate the 
result of the first (and ideally only) call to `exists`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to