[ 
https://issues.apache.org/jira/browse/GOBBLIN-2151?focusedWorklogId=934130&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-934130
 ]

ASF GitHub Bot logged work on GOBBLIN-2151:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Sep/24 02:30
            Start Date: 11/Sep/24 02:30
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4050:
URL: https://github.com/apache/gobblin/pull/4050#discussion_r1753022115


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -187,9 +194,59 @@ public Optional<Dag<JobExecutionPlan>> 
validateAndHandleConcurrentExecution(Conf
    * @param allowConcurrentExecution
    * @return true if the {@link FlowSpec} allows concurrent executions or if 
no other instance of the flow is currently RUNNING.
    */
-  private boolean isExecutionPermitted(FlowStatusGenerator 
flowStatusGenerator, String flowGroup, String flowName,
-      boolean allowConcurrentExecution, long flowExecutionId) {
-    return allowConcurrentExecution || 
!flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId);
+  private boolean isExecutionPermitted(String flowGroup, String flowName, 
boolean allowConcurrentExecution)
+      throws IOException {
+    return allowConcurrentExecution || !isFlowRunning(flowGroup, flowName, 
dagManagementStateStore);
+  }
+
+  /**
+   * Returns true if any previous execution for the flow determined by the 
provided flowGroup, flowName is running.
+   * We ignore the execution that has the provided flowExecutionId. We also 
ignore the flows that are running beyond

Review Comment:
   in the comment, please elaborate on:
   > We ignore the execution that has the provided flowExecutionId.
   to talk about the re-attempt of the same `DagAction` as described in the 
discussion here



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -195,19 +196,19 @@ public Optional<Dag<JobExecutionPlan>> 
validateAndHandleConcurrentExecution(Conf
    * @param allowConcurrentExecution
    * @return true if the {@link FlowSpec} allows concurrent executions or if 
no other instance of the flow is currently RUNNING.
    */
-  private boolean isExecutionPermitted(String flowGroup, String flowName, 
boolean allowConcurrentExecution)
+  private boolean isExecutionPermitted(String flowGroup, String flowName, long 
flowExecutionId, boolean allowConcurrentExecution)
       throws IOException {
-    return allowConcurrentExecution || !isFlowRunning(flowGroup, flowName, 
dagManagementStateStore);
+    return allowConcurrentExecution || 
!isFlowBeforeThisExecutionRunning(flowGroup, flowName, flowExecutionId, 
dagManagementStateStore);
   }
 
   /**
-   * Returns true if any previous execution for the flow determined by the 
provided flowGroup, flowName is running.
+   * Returns true if any previous execution for the flow determined by the 
provided flowGroup, flowName, flowExecutionId is running.
    * We ignore the execution that has the provided flowExecutionId. We also 
ignore the flows that are running beyond
    * the job start deadline and flow finish deadline.
    * If this method returns `false`, callers may start a flow and subsequent 
calls to this method may return `true`.
    */
   @VisibleForTesting
-  static boolean isFlowRunning(String flowGroup, String flowName, 
DagManagementStateStore dagManagementStateStore)
+  static boolean isFlowBeforeThisExecutionRunning(String flowGroup, String 
flowName, long flowExecutionId, DagManagementStateStore dagManagementStateStore)

Review Comment:
   NBD, but suggest `isPriorFlowExecutionRunning`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -165,7 +165,8 @@ public Optional<Dag<JobExecutionPlan>> 
validateAndHandleConcurrentExecution(Conf
     }
     addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
 
-    if (isExecutionPermitted(flowGroup, flowName, allowConcurrentExecution)) {
+    if (isExecutionPermitted(flowGroup, flowName,
+        
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)),
 allowConcurrentExecution)) {

Review Comment:
   suggest to do this w/ the others at the start of:
   ```
      public Optional<Dag<JobExecutionPlan>> 
createExecutionPlanIfValid(FlowSpec flowSpec)
         throws IOException, InterruptedException {
       Config flowConfig = flowSpec.getConfig();
       String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
       ...
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -216,6 +217,12 @@ static boolean isFlowRunning(String flowGroup, String 
flowName, DagManagementSta
     }
 
     for (FlowStatus flowStatus : flowStatusList) {
+      if (flowStatus.getFlowExecutionId() == flowExecutionId) {
+        // a duplicate call to this method indicate that the prior caller of 
this method could not complete the required action,
+        // so we ignore any flow status for the current execution to give the 
caller another chance to complete them
+        continue;

Review Comment:
   given we expect re-attempts to be rare, let's at least log this condition.  
we really expect it only in `COMPILED` state, so let's do `.info` level.  but 
if it's any other status, that merits `.warn`
   
   perhaps if the state is actually `RUNNING` it should even be `.error` level



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java:
##########
@@ -156,48 +161,61 @@ public void 
testConcurrentFlowPreviousExecutionWithNonTerminalStatusRunningWithi
     String flowGroup = "fg";
     String flowName = "fn";
     long flowFinishDeadline = 10000L;
-    long flowStartTime = System.currentTimeMillis();  // giving test 
flowFinishDeadline to finish
+    long flowStartTime = System.currentTimeMillis() - 1 ;  // giving test 
flowFinishDeadline + 1 ms to finish
+    long currentFlowExecutionId = System.currentTimeMillis() ;
 
     insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime, 
ExecutionStatus.RUNNING,
         ConfigFactory.empty()
             
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
             .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME, 
ConfigValueFactory.fromAnyRef(flowFinishDeadline)));
 
-    Assert.assertTrue(FlowCompilationValidationHelper.isFlowRunning(flowGroup, 
flowName, this.dagManagementStateStore));
+    
Assert.assertTrue(FlowCompilationValidationHelper.isFlowBeforeThisExecutionRunning(flowGroup,
 flowName,
+        currentFlowExecutionId, this.dagManagementStateStore));
   }
 
   @Test
   public void testConcurrentFlowNoPreviousExecutionRunning() throws 
IOException, URISyntaxException {
     String flowGroup = "fg";
     String flowName = "fn";
-    long currentFlowExecutionId = 67890L;
-    long flowFinishDeadline = 10000L;
     long flowStartTime = System.currentTimeMillis();  // giving test 
flowFinishDeadline to finish
+    insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime, 
ExecutionStatus.PENDING,
+        ConfigFactory.empty()
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME, 
ConfigValueFactory.fromAnyRef(flowStartTime)));
+
+    // change the mock to not return any previous flow status
     when(this.dagManagementStateStore.getAllFlowStatusesForFlow(anyString(), 
anyString())).thenReturn(Collections.emptyList());
-    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", 
currentFlowExecutionId,
-        DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(), 5, 
"user5", ConfigFactory.empty()
-            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
-            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
-            
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
-            .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME, 
ConfigValueFactory.fromAnyRef(flowFinishDeadline))
-            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
-    dag.getNodes().forEach(node -> 
node.getValue().setFlowStartTime(flowStartTime));
-    this.dagManagementStateStore.addDag(dag);
 
-    
Assert.assertFalse(FlowCompilationValidationHelper.isFlowRunning(flowGroup, 
flowName, this.dagManagementStateStore));
+    
Assert.assertFalse(FlowCompilationValidationHelper.isFlowBeforeThisExecutionRunning(flowGroup,
 flowName,
+        flowStartTime, this.dagManagementStateStore));
+  }
+
+  @Test
+  public void 
testConcurrentFlowCurrentExecutionWithNonTerminalStatusRunningWithinJobStartDeadline()
 throws IOException, URISyntaxException {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long jobStartDeadline = 10000L;
+    long flowStartTime = System.currentTimeMillis();
+
+    insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime, 
ExecutionStatus.PENDING,
+        ConfigFactory.empty()

Review Comment:
   in this case where the only flow status found is for the one currently being 
validated, that would arise from a re-attempt of the same 
`DagActionType.LAUNCH`, correct?  if so, would we expect the 
`ExecutionStatus.COMPILED` or actually `PENDING`?  if the former, let's ensure 
we have a test case for that.
   
   also, suggest method naming to make clear this is not actually "concurrent", 
but "same" execution.  maybe 
`testSameFlowExecAlreadyCompiledWithinJobStartDeadline()`





Issue Time Tracking
-------------------

    Worklog Id:     (was: 934130)
    Time Spent: 2.5h  (was: 2h 20m)

> ignore flows that are running beyond job start and flow finish deadline
> -----------------------------------------------------------------------
>
>                 Key: GOBBLIN-2151
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2151
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to