[
https://issues.apache.org/jira/browse/GOBBLIN-2151?focusedWorklogId=934130&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-934130
]
ASF GitHub Bot logged work on GOBBLIN-2151:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 11/Sep/24 02:30
Start Date: 11/Sep/24 02:30
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4050:
URL: https://github.com/apache/gobblin/pull/4050#discussion_r1753022115
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -187,9 +194,59 @@ public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Conf
* @param allowConcurrentExecution
* @return true if the {@link FlowSpec} allows concurrent executions or if
no other instance of the flow is currently RUNNING.
*/
- private boolean isExecutionPermitted(FlowStatusGenerator
flowStatusGenerator, String flowGroup, String flowName,
- boolean allowConcurrentExecution, long flowExecutionId) {
- return allowConcurrentExecution ||
!flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId);
+ private boolean isExecutionPermitted(String flowGroup, String flowName,
boolean allowConcurrentExecution)
+ throws IOException {
+ return allowConcurrentExecution || !isFlowRunning(flowGroup, flowName,
dagManagementStateStore);
+ }
+
+ /**
+ * Returns true if any previous execution for the flow determined by the
provided flowGroup, flowName is running.
+ * We ignore the execution that has the provided flowExecutionId. We also
ignore the flows that are running beyond
Review Comment:
in the comment, please elaborate on:
> We ignore the execution that has the provided flowExecutionId.
to talk about the re-attempt of the same `DagAction` as described in the
discussion here
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -195,19 +196,19 @@ public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Conf
* @param allowConcurrentExecution
* @return true if the {@link FlowSpec} allows concurrent executions or if
no other instance of the flow is currently RUNNING.
*/
- private boolean isExecutionPermitted(String flowGroup, String flowName,
boolean allowConcurrentExecution)
+ private boolean isExecutionPermitted(String flowGroup, String flowName, long
flowExecutionId, boolean allowConcurrentExecution)
throws IOException {
- return allowConcurrentExecution || !isFlowRunning(flowGroup, flowName,
dagManagementStateStore);
+ return allowConcurrentExecution ||
!isFlowBeforeThisExecutionRunning(flowGroup, flowName, flowExecutionId,
dagManagementStateStore);
}
/**
- * Returns true if any previous execution for the flow determined by the
provided flowGroup, flowName is running.
+ * Returns true if any previous execution for the flow determined by the
provided flowGroup, flowName, flowExecutionId is running.
* We ignore the execution that has the provided flowExecutionId. We also
ignore the flows that are running beyond
* the job start deadline and flow finish deadline.
* If this method returns `false`, callers may start a flow and subsequent
calls to this method may return `true`.
*/
@VisibleForTesting
- static boolean isFlowRunning(String flowGroup, String flowName,
DagManagementStateStore dagManagementStateStore)
+ static boolean isFlowBeforeThisExecutionRunning(String flowGroup, String
flowName, long flowExecutionId, DagManagementStateStore dagManagementStateStore)
Review Comment:
NBD, but suggest `isPriorFlowExecutionRunning`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -165,7 +165,8 @@ public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Conf
}
addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
- if (isExecutionPermitted(flowGroup, flowName, allowConcurrentExecution)) {
+ if (isExecutionPermitted(flowGroup, flowName,
+
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)),
allowConcurrentExecution)) {
Review Comment:
suggest to do this w/ the others at the start of:
```
public Optional<Dag<JobExecutionPlan>>
createExecutionPlanIfValid(FlowSpec flowSpec)
throws IOException, InterruptedException {
Config flowConfig = flowSpec.getConfig();
String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
...
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -216,6 +217,12 @@ static boolean isFlowRunning(String flowGroup, String
flowName, DagManagementSta
}
for (FlowStatus flowStatus : flowStatusList) {
+ if (flowStatus.getFlowExecutionId() == flowExecutionId) {
+ // a duplicate call to this method indicate that the prior caller of
this method could not complete the required action,
+ // so we ignore any flow status for the current execution to give the
caller another chance to complete them
+ continue;
Review Comment:
given we expect re-attempts to be rare, let's at least log this condition.
we really expect it only in `COMPILED` state, so let's do `.info` level. but
if it's any other status, that merits `.warn`
perhaps if the state is actually `RUNNING` it should even be `.error` level
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java:
##########
@@ -156,48 +161,61 @@ public void
testConcurrentFlowPreviousExecutionWithNonTerminalStatusRunningWithi
String flowGroup = "fg";
String flowName = "fn";
long flowFinishDeadline = 10000L;
- long flowStartTime = System.currentTimeMillis(); // giving test
flowFinishDeadline to finish
+ long flowStartTime = System.currentTimeMillis() - 1 ; // giving test
flowFinishDeadline + 1 ms to finish
+ long currentFlowExecutionId = System.currentTimeMillis() ;
insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime,
ExecutionStatus.RUNNING,
ConfigFactory.empty()
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(flowFinishDeadline)));
- Assert.assertTrue(FlowCompilationValidationHelper.isFlowRunning(flowGroup,
flowName, this.dagManagementStateStore));
+
Assert.assertTrue(FlowCompilationValidationHelper.isFlowBeforeThisExecutionRunning(flowGroup,
flowName,
+ currentFlowExecutionId, this.dagManagementStateStore));
}
@Test
public void testConcurrentFlowNoPreviousExecutionRunning() throws
IOException, URISyntaxException {
String flowGroup = "fg";
String flowName = "fn";
- long currentFlowExecutionId = 67890L;
- long flowFinishDeadline = 10000L;
long flowStartTime = System.currentTimeMillis(); // giving test
flowFinishDeadline to finish
+ insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime,
ExecutionStatus.PENDING,
+ ConfigFactory.empty()
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(flowStartTime)));
+
+ // change the mock to not return any previous flow status
when(this.dagManagementStateStore.getAllFlowStatusesForFlow(anyString(),
anyString())).thenReturn(Collections.emptyList());
- Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1",
currentFlowExecutionId,
- DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(), 5,
"user5", ConfigFactory.empty()
- .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
- .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
-
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
- .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(flowFinishDeadline))
- .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
- MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
- dag.getNodes().forEach(node ->
node.getValue().setFlowStartTime(flowStartTime));
- this.dagManagementStateStore.addDag(dag);
-
Assert.assertFalse(FlowCompilationValidationHelper.isFlowRunning(flowGroup,
flowName, this.dagManagementStateStore));
+
Assert.assertFalse(FlowCompilationValidationHelper.isFlowBeforeThisExecutionRunning(flowGroup,
flowName,
+ flowStartTime, this.dagManagementStateStore));
+ }
+
+ @Test
+ public void
testConcurrentFlowCurrentExecutionWithNonTerminalStatusRunningWithinJobStartDeadline()
throws IOException, URISyntaxException {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long jobStartDeadline = 10000L;
+ long flowStartTime = System.currentTimeMillis();
+
+ insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime,
ExecutionStatus.PENDING,
+ ConfigFactory.empty()
Review Comment:
in this case where the only flow status found is for the one currently being
validated, that would arise from a re-attempt of the same
`DagActionType.LAUNCH`, correct? if so, would we expect the
`ExecutionStatus.COMPILED` or actually `PENDING`? if the former, let's ensure
we have a test case for that.
also, suggest method naming to make clear this is not actually "concurrent",
but "same" execution. maybe
`testSameFlowExecAlreadyCompiledWithinJobStartDeadline()`
Issue Time Tracking
-------------------
Worklog Id: (was: 934130)
Time Spent: 2.5h (was: 2h 20m)
> ignore flows that are running beyond job start and flow finish deadline
> -----------------------------------------------------------------------
>
> Key: GOBBLIN-2151
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2151
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)