This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f6ba4731f6 [GOBBLIN-2145] fix bug in getting flow status (#4040)
f6ba4731f6 is described below
commit f6ba4731f6e47a5b0701341e5f6d7e9925946119
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Aug 28 21:00:13 2024 -0700
[GOBBLIN-2145] fix bug in getting flow status (#4040)
* fix bug in getting flow status
---
.../service/monitoring/FlowStatusGenerator.java | 2 +-
.../monitoring/FlowStatusGeneratorTest.java | 24 +++++++++++-----------
2 files changed, 13 insertions(+), 13 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index b4ec8c3530..dea106c294 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -154,7 +154,7 @@ public class FlowStatusGenerator {
* @return true, if any jobs of the flow are RUNNING.
*/
public boolean isFlowRunning(String flowName, String flowGroup, long
flowExecutionId) {
- List<FlowStatus> flowStatusList =
jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup);
+ List<FlowStatus> flowStatusList =
jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup,
flowName);
if (flowStatusList == null || flowStatusList.isEmpty()) {
return false;
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 9e84a1fa03..00ad043df8 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -43,7 +43,7 @@ public class FlowStatusGeneratorTest {
String flowName = "testName";
String flowGroup = "testGroup";
long currFlowExecutionId = 1234L;
-
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup)).thenReturn(null);
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup,
flowName)).thenReturn(null);
FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
currFlowExecutionId));
@@ -59,7 +59,7 @@ public class FlowStatusGeneratorTest {
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
FlowStatus flowStatus = new
FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,ExecutionStatus.COMPILED);
-
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup)).thenReturn(
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup,
flowName)).thenReturn(
Lists.newArrayList(flowStatus));
FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
// Block the next execution if the prior one is in compiled as it's
considered still running
@@ -78,15 +78,15 @@ public class FlowStatusGeneratorTest {
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
FlowStatus flowStatus = new
FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,ExecutionStatus.COMPILED);
-
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup)).thenReturn(
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup,
flowName)).thenReturn(
Lists.newArrayList(flowStatus));
FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
// If the flow is compiled but the flow execution status is the same as
the one about to be kicked off, do not consider it as running.
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId));
}
- @Test
- public void testIsFlowRunningJobExecutionIgnored() {
+ @Test
+ public void testIsFlowRunningJobExecutionIgnored() {
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
@@ -109,14 +109,14 @@ public class FlowStatusGeneratorTest {
FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
FlowStatus flowStatus = new
FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusIterator));
-
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup)).thenReturn(Lists.newArrayList(flowStatus));
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup,
flowName)).thenReturn(Lists.newArrayList(flowStatus));
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId));
jobStatus4 =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3,
jobStatus4).iterator();
flowStatus = new
FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusIterator));
-
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup)).thenReturn(Collections.singletonList(flowStatus));
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup,
flowName)).thenReturn(Collections.singletonList(flowStatus));
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId+1));
}
@@ -186,7 +186,7 @@ public class FlowStatusGeneratorTest {
FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
// Mocking the retrieval of empty flowStatusList
-
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup))
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup,
flowName))
.thenReturn(Collections.emptyList());
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId));
@@ -207,7 +207,7 @@ public class FlowStatusGeneratorTest {
createFlowStatus(flowName, flowGroup, flowExecutionId, "CANCELLED")
);
-
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup))
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup,
flowName))
.thenReturn(flowStatusList);
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId));
@@ -229,9 +229,9 @@ public class FlowStatusGeneratorTest {
createFlowStatus(flowName, flowGroup, flowExecutionId+1, "FAILED"),
createFlowStatus(flowName, flowGroup, flowExecutionId, "COMPLETE")
- );
+ );
-
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup))
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup,
flowName))
.thenReturn(flowStatusList);
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId + 1));
@@ -250,7 +250,7 @@ public class FlowStatusGeneratorTest {
createFlowStatus(flowName, flowGroup, flowExecutionId, "RUNNING")
);
-
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup))
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup,
flowName))
.thenReturn(flowStatusList);
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId));