[
https://issues.apache.org/jira/browse/GOBBLIN-2151?focusedWorklogId=934133&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-934133
]
ASF GitHub Bot logged work on GOBBLIN-2151:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 11/Sep/24 02:32
Start Date: 11/Sep/24 02:32
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4050:
URL: https://github.com/apache/gobblin/pull/4050#discussion_r1753028698
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java:
##########
@@ -156,48 +161,61 @@ public void
testConcurrentFlowPreviousExecutionWithNonTerminalStatusRunningWithi
String flowGroup = "fg";
String flowName = "fn";
long flowFinishDeadline = 10000L;
- long flowStartTime = System.currentTimeMillis(); // giving test
flowFinishDeadline to finish
+ long flowStartTime = System.currentTimeMillis() - 1 ; // giving test
flowFinishDeadline + 1 ms to finish
+ long currentFlowExecutionId = System.currentTimeMillis() ;
insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime,
ExecutionStatus.RUNNING,
ConfigFactory.empty()
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(flowFinishDeadline)));
- Assert.assertTrue(FlowCompilationValidationHelper.isFlowRunning(flowGroup,
flowName, this.dagManagementStateStore));
+
Assert.assertTrue(FlowCompilationValidationHelper.isFlowBeforeThisExecutionRunning(flowGroup,
flowName,
+ currentFlowExecutionId, this.dagManagementStateStore));
}
@Test
public void testConcurrentFlowNoPreviousExecutionRunning() throws
IOException, URISyntaxException {
String flowGroup = "fg";
String flowName = "fn";
- long currentFlowExecutionId = 67890L;
- long flowFinishDeadline = 10000L;
long flowStartTime = System.currentTimeMillis(); // giving test
flowFinishDeadline to finish
+ insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime,
ExecutionStatus.PENDING,
+ ConfigFactory.empty()
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(flowStartTime)));
+
+ // change the mock to not return any previous flow status
when(this.dagManagementStateStore.getAllFlowStatusesForFlow(anyString(),
anyString())).thenReturn(Collections.emptyList());
- Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1",
currentFlowExecutionId,
- DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(), 5,
"user5", ConfigFactory.empty()
- .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
- .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
-
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
- .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(flowFinishDeadline))
- .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
- MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
- dag.getNodes().forEach(node ->
node.getValue().setFlowStartTime(flowStartTime));
- this.dagManagementStateStore.addDag(dag);
-
Assert.assertFalse(FlowCompilationValidationHelper.isFlowRunning(flowGroup,
flowName, this.dagManagementStateStore));
+
Assert.assertFalse(FlowCompilationValidationHelper.isFlowBeforeThisExecutionRunning(flowGroup,
flowName,
+ flowStartTime, this.dagManagementStateStore));
+ }
+
+ @Test
+ public void
testConcurrentFlowCurrentExecutionWithNonTerminalStatusRunningWithinJobStartDeadline()
throws IOException, URISyntaxException {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long jobStartDeadline = 10000L;
+ long flowStartTime = System.currentTimeMillis();
+
+ insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime,
ExecutionStatus.PENDING,
+ ConfigFactory.empty()
Review Comment:
in this case where the only flow status found is for the flowExecId
currently being validated, that arises during a re-attempt of the same
`DagActionType.LAUNCH`, correct?
if so, would we expect the `ExecutionStatus.COMPILED` or actually `PENDING`?
if the former, let's ensure we have a test case for that.
also, suggest method naming to make clear this is not actually "concurrent",
but "same" execution. maybe
`testSameFlowExecAlreadyCompiledWithinJobStartDeadline()`
Issue Time Tracking
-------------------
Worklog Id: (was: 934133)
Time Spent: 3h (was: 2h 50m)
> ignore flows that are running beyond job start and flow finish deadline
> -----------------------------------------------------------------------
>
> Key: GOBBLIN-2151
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2151
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 3h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)