phet commented on code in PR #4050:
URL: https://github.com/apache/gobblin/pull/4050#discussion_r1753022115


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -187,9 +194,59 @@ public Optional<Dag<JobExecutionPlan>> 
validateAndHandleConcurrentExecution(Conf
    * @param allowConcurrentExecution
    * @return true if the {@link FlowSpec} allows concurrent executions or if 
no other instance of the flow is currently RUNNING.
    */
-  private boolean isExecutionPermitted(FlowStatusGenerator 
flowStatusGenerator, String flowGroup, String flowName,
-      boolean allowConcurrentExecution, long flowExecutionId) {
-    return allowConcurrentExecution || 
!flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId);
+  private boolean isExecutionPermitted(String flowGroup, String flowName, 
boolean allowConcurrentExecution)
+      throws IOException {
+    return allowConcurrentExecution || !isFlowRunning(flowGroup, flowName, 
dagManagementStateStore);
+  }
+
+  /**
+   * Returns true if any previous execution for the flow determined by the 
provided flowGroup, flowName is running.
+   * We ignore the execution that has the provided flowExecutionId. We also 
ignore the flows that are running beyond

Review Comment:
   in the comment, please elaborate on:
   > We ignore the execution that has the provided flowExecutionId.
   to talk about the re-attempt of the same `DagAction` as described in the 
discussion here



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -195,19 +196,19 @@ public Optional<Dag<JobExecutionPlan>> 
validateAndHandleConcurrentExecution(Conf
    * @param allowConcurrentExecution
    * @return true if the {@link FlowSpec} allows concurrent executions or if 
no other instance of the flow is currently RUNNING.
    */
-  private boolean isExecutionPermitted(String flowGroup, String flowName, 
boolean allowConcurrentExecution)
+  private boolean isExecutionPermitted(String flowGroup, String flowName, long 
flowExecutionId, boolean allowConcurrentExecution)
       throws IOException {
-    return allowConcurrentExecution || !isFlowRunning(flowGroup, flowName, 
dagManagementStateStore);
+    return allowConcurrentExecution || 
!isFlowBeforeThisExecutionRunning(flowGroup, flowName, flowExecutionId, 
dagManagementStateStore);
   }
 
   /**
-   * Returns true if any previous execution for the flow determined by the 
provided flowGroup, flowName is running.
+   * Returns true if any previous execution for the flow determined by the 
provided flowGroup, flowName, flowExecutionId is running.
    * We ignore the execution that has the provided flowExecutionId. We also 
ignore the flows that are running beyond
    * the job start deadline and flow finish deadline.
    * If this method returns `false`, callers may start a flow and subsequent 
calls to this method may return `true`.
    */
   @VisibleForTesting
-  static boolean isFlowRunning(String flowGroup, String flowName, 
DagManagementStateStore dagManagementStateStore)
+  static boolean isFlowBeforeThisExecutionRunning(String flowGroup, String 
flowName, long flowExecutionId, DagManagementStateStore dagManagementStateStore)

Review Comment:
   NBD, but suggest `isPriorFlowExecutionRunning`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -165,7 +165,8 @@ public Optional<Dag<JobExecutionPlan>> 
validateAndHandleConcurrentExecution(Conf
     }
     addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
 
-    if (isExecutionPermitted(flowGroup, flowName, allowConcurrentExecution)) {
+    if (isExecutionPermitted(flowGroup, flowName,
+        
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)),
 allowConcurrentExecution)) {

Review Comment:
   suggest to do this w/ the others at the start of:
   ```
      public Optional<Dag<JobExecutionPlan>> 
createExecutionPlanIfValid(FlowSpec flowSpec)
         throws IOException, InterruptedException {
       Config flowConfig = flowSpec.getConfig();
       String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
       ...
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -216,6 +217,12 @@ static boolean isFlowRunning(String flowGroup, String 
flowName, DagManagementSta
     }
 
     for (FlowStatus flowStatus : flowStatusList) {
+      if (flowStatus.getFlowExecutionId() == flowExecutionId) {
+        // a duplicate call to this method indicate that the prior caller of 
this method could not complete the required action,
+        // so we ignore any flow status for the current execution to give the 
caller another chance to complete them
+        continue;

Review Comment:
   given we expect re-attempts to be rare, let's at least log this condition.  
we really expect it only in `COMPILED` state, so let's do `.info` level.  but 
if it's any other status, that merits `.warn`
   
   perhaps if the state is actually `RUNNING` it should even be `.error` level



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java:
##########
@@ -156,48 +161,61 @@ public void 
testConcurrentFlowPreviousExecutionWithNonTerminalStatusRunningWithi
     String flowGroup = "fg";
     String flowName = "fn";
     long flowFinishDeadline = 10000L;
-    long flowStartTime = System.currentTimeMillis();  // giving test 
flowFinishDeadline to finish
+    long flowStartTime = System.currentTimeMillis() - 1 ;  // giving test 
flowFinishDeadline + 1 ms to finish
+    long currentFlowExecutionId = System.currentTimeMillis() ;
 
     insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime, 
ExecutionStatus.RUNNING,
         ConfigFactory.empty()
             
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
             .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME, 
ConfigValueFactory.fromAnyRef(flowFinishDeadline)));
 
-    Assert.assertTrue(FlowCompilationValidationHelper.isFlowRunning(flowGroup, 
flowName, this.dagManagementStateStore));
+    
Assert.assertTrue(FlowCompilationValidationHelper.isFlowBeforeThisExecutionRunning(flowGroup,
 flowName,
+        currentFlowExecutionId, this.dagManagementStateStore));
   }
 
   @Test
   public void testConcurrentFlowNoPreviousExecutionRunning() throws 
IOException, URISyntaxException {
     String flowGroup = "fg";
     String flowName = "fn";
-    long currentFlowExecutionId = 67890L;
-    long flowFinishDeadline = 10000L;
     long flowStartTime = System.currentTimeMillis();  // giving test 
flowFinishDeadline to finish
+    insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime, 
ExecutionStatus.PENDING,
+        ConfigFactory.empty()
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME, 
ConfigValueFactory.fromAnyRef(flowStartTime)));
+
+    // change the mock to not return any previous flow status
     when(this.dagManagementStateStore.getAllFlowStatusesForFlow(anyString(), 
anyString())).thenReturn(Collections.emptyList());
-    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", 
currentFlowExecutionId,
-        DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(), 5, 
"user5", ConfigFactory.empty()
-            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
-            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
-            
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
-            .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME, 
ConfigValueFactory.fromAnyRef(flowFinishDeadline))
-            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
-    dag.getNodes().forEach(node -> 
node.getValue().setFlowStartTime(flowStartTime));
-    this.dagManagementStateStore.addDag(dag);
 
-    
Assert.assertFalse(FlowCompilationValidationHelper.isFlowRunning(flowGroup, 
flowName, this.dagManagementStateStore));
+    
Assert.assertFalse(FlowCompilationValidationHelper.isFlowBeforeThisExecutionRunning(flowGroup,
 flowName,
+        flowStartTime, this.dagManagementStateStore));
+  }
+
+  @Test
+  public void 
testConcurrentFlowCurrentExecutionWithNonTerminalStatusRunningWithinJobStartDeadline()
 throws IOException, URISyntaxException {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long jobStartDeadline = 10000L;
+    long flowStartTime = System.currentTimeMillis();
+
+    insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime, 
ExecutionStatus.PENDING,
+        ConfigFactory.empty()

Review Comment:
   in this case where the only flow status found is for the one currently being 
validated, that would arise from a re-attempt of the same 
`DagActionType.LAUNCH`, correct?  if so, would we expect the 
`ExecutionStatus.COMPILED` or actually `PENDING`?  if the former, let's ensure 
we have a test case for that.
   
   also, suggest method naming to make clear this is not actually "concurrent", 
but "same" execution.  maybe 
`testSameFlowExecAlreadyCompiledWithinJobStartDeadline()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to