phet commented on code in PR #4050:
URL: https://github.com/apache/gobblin/pull/4050#discussion_r1753022115
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -187,9 +194,59 @@ public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Conf
* @param allowConcurrentExecution
* @return true if the {@link FlowSpec} allows concurrent executions or if
no other instance of the flow is currently RUNNING.
*/
- private boolean isExecutionPermitted(FlowStatusGenerator
flowStatusGenerator, String flowGroup, String flowName,
- boolean allowConcurrentExecution, long flowExecutionId) {
- return allowConcurrentExecution ||
!flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId);
+ private boolean isExecutionPermitted(String flowGroup, String flowName,
boolean allowConcurrentExecution)
+ throws IOException {
+ return allowConcurrentExecution || !isFlowRunning(flowGroup, flowName,
dagManagementStateStore);
+ }
+
+ /**
+ * Returns true if any previous execution for the flow determined by the
provided flowGroup, flowName is running.
+ * We ignore the execution that has the provided flowExecutionId. We also
ignore the flows that are running beyond
Review Comment:
in the comment, please elaborate on:
> We ignore the execution that has the provided flowExecutionId.
to talk about the re-attempt of the same `DagAction` as described in the
discussion here
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -195,19 +196,19 @@ public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Conf
* @param allowConcurrentExecution
* @return true if the {@link FlowSpec} allows concurrent executions or if
no other instance of the flow is currently RUNNING.
*/
- private boolean isExecutionPermitted(String flowGroup, String flowName,
boolean allowConcurrentExecution)
+ private boolean isExecutionPermitted(String flowGroup, String flowName, long
flowExecutionId, boolean allowConcurrentExecution)
throws IOException {
- return allowConcurrentExecution || !isFlowRunning(flowGroup, flowName,
dagManagementStateStore);
+ return allowConcurrentExecution ||
!isFlowBeforeThisExecutionRunning(flowGroup, flowName, flowExecutionId,
dagManagementStateStore);
}
/**
- * Returns true if any previous execution for the flow determined by the
provided flowGroup, flowName is running.
+ * Returns true if any previous execution for the flow determined by the
provided flowGroup, flowName, flowExecutionId is running.
* We ignore the execution that has the provided flowExecutionId. We also
ignore the flows that are running beyond
* the job start deadline and flow finish deadline.
* If this method returns `false`, callers may start a flow and subsequent
calls to this method may return `true`.
*/
@VisibleForTesting
- static boolean isFlowRunning(String flowGroup, String flowName,
DagManagementStateStore dagManagementStateStore)
+ static boolean isFlowBeforeThisExecutionRunning(String flowGroup, String
flowName, long flowExecutionId, DagManagementStateStore dagManagementStateStore)
Review Comment:
NBD, but suggest `isPriorFlowExecutionRunning`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -165,7 +165,8 @@ public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Conf
}
addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
- if (isExecutionPermitted(flowGroup, flowName, allowConcurrentExecution)) {
+ if (isExecutionPermitted(flowGroup, flowName,
+
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)),
allowConcurrentExecution)) {
Review Comment:
suggest to do this w/ the others at the start of:
```
public Optional<Dag<JobExecutionPlan>>
createExecutionPlanIfValid(FlowSpec flowSpec)
throws IOException, InterruptedException {
Config flowConfig = flowSpec.getConfig();
String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
...
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -216,6 +217,12 @@ static boolean isFlowRunning(String flowGroup, String
flowName, DagManagementSta
}
for (FlowStatus flowStatus : flowStatusList) {
+ if (flowStatus.getFlowExecutionId() == flowExecutionId) {
+ // a duplicate call to this method indicate that the prior caller of
this method could not complete the required action,
+ // so we ignore any flow status for the current execution to give the
caller another chance to complete them
+ continue;
Review Comment:
given we expect re-attempts to be rare, let's at least log this condition.
we really expect it only in `COMPILED` state, so let's do `.info` level. but
if it's any other status, that merits `.warn`
perhaps if the state is actually `RUNNING` it should even be `.error` level
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java:
##########
@@ -156,48 +161,61 @@ public void
testConcurrentFlowPreviousExecutionWithNonTerminalStatusRunningWithi
String flowGroup = "fg";
String flowName = "fn";
long flowFinishDeadline = 10000L;
- long flowStartTime = System.currentTimeMillis(); // giving test
flowFinishDeadline to finish
+ long flowStartTime = System.currentTimeMillis() - 1 ; // giving test
flowFinishDeadline + 1 ms to finish
+ long currentFlowExecutionId = System.currentTimeMillis() ;
insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime,
ExecutionStatus.RUNNING,
ConfigFactory.empty()
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(flowFinishDeadline)));
- Assert.assertTrue(FlowCompilationValidationHelper.isFlowRunning(flowGroup,
flowName, this.dagManagementStateStore));
+
Assert.assertTrue(FlowCompilationValidationHelper.isFlowBeforeThisExecutionRunning(flowGroup,
flowName,
+ currentFlowExecutionId, this.dagManagementStateStore));
}
@Test
public void testConcurrentFlowNoPreviousExecutionRunning() throws
IOException, URISyntaxException {
String flowGroup = "fg";
String flowName = "fn";
- long currentFlowExecutionId = 67890L;
- long flowFinishDeadline = 10000L;
long flowStartTime = System.currentTimeMillis(); // giving test
flowFinishDeadline to finish
+ insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime,
ExecutionStatus.PENDING,
+ ConfigFactory.empty()
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(flowStartTime)));
+
+ // change the mock to not return any previous flow status
when(this.dagManagementStateStore.getAllFlowStatusesForFlow(anyString(),
anyString())).thenReturn(Collections.emptyList());
- Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1",
currentFlowExecutionId,
- DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(), 5,
"user5", ConfigFactory.empty()
- .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
- .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
-
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
- .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(flowFinishDeadline))
- .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
- MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
- dag.getNodes().forEach(node ->
node.getValue().setFlowStartTime(flowStartTime));
- this.dagManagementStateStore.addDag(dag);
-
Assert.assertFalse(FlowCompilationValidationHelper.isFlowRunning(flowGroup,
flowName, this.dagManagementStateStore));
+
Assert.assertFalse(FlowCompilationValidationHelper.isFlowBeforeThisExecutionRunning(flowGroup,
flowName,
+ flowStartTime, this.dagManagementStateStore));
+ }
+
+ @Test
+ public void
testConcurrentFlowCurrentExecutionWithNonTerminalStatusRunningWithinJobStartDeadline()
throws IOException, URISyntaxException {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long jobStartDeadline = 10000L;
+ long flowStartTime = System.currentTimeMillis();
+
+ insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime,
ExecutionStatus.PENDING,
+ ConfigFactory.empty()
Review Comment:
in this case where the only flow status found is for the one currently being
validated, that would arise from a re-attempt of the same
`DagActionType.LAUNCH`, correct? if so, would we expect the
`ExecutionStatus.COMPILED` or actually `PENDING`? if the former, let's ensure
we have a test case for that.
also, suggest method naming to make clear this is not actually "concurrent",
but "same" execution. maybe
`testSameFlowExecAlreadyCompiledWithinJobStartDeadline()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]