This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4721d086f [GOBBLIN-2102]Concurrent flow status check fix (#3989)
4721d086f is described below
commit 4721d086f78553cab28e46bf406787df6cc2dea8
Author: pratapaditya04 <[email protected]>
AuthorDate: Sat Jun 29 00:10:20 2024 +0530
[GOBBLIN-2102]Concurrent flow status check fix (#3989)
modified isFlowStatus to check all the flow statuses instead of just the
last one
---------
Co-authored-by: Aditya Pratap Singh <[email protected]>
---
.../org/apache/gobblin/service/FlowStatusTest.java | 6 ++
.../service/monitoring/FlowStatusGenerator.java | 18 ++--
.../service/monitoring/JobStatusRetriever.java | 32 +++++-
.../monitoring/FlowStatusGeneratorTest.java | 112 ++++++++++++++++++---
.../service/monitoring/FsJobStatusRetriever.java | 17 ++++
.../monitoring/LocalFsJobStatusRetriever.java | 35 ++++++-
.../monitoring/MysqlJobStatusRetriever.java | 8 ++
.../service/monitoring/JobStatusRetrieverTest.java | 67 +++++++++++-
8 files changed, 268 insertions(+), 27 deletions(-)
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index 4888a2c23..d4950fcf3 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -91,6 +91,12 @@ public class FlowStatusTest {
int countJobStatusesPerFlowName) {
return Lists.newArrayList(); // (as this method not exercised within
`FlowStatusResource`)
}
+
+ @Override
+ public List<org.apache.gobblin.service.monitoring.FlowStatus>
getAllFlowStatusesForFlowExecutionsOrdered(
+ String flowGroup, String flowName) {
+ return Lists.newArrayList();// (as this method not exercised within
`FlowStatusResource`)
+ }
}
@BeforeClass
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index c6183cc52..b4ec8c353 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -154,16 +154,22 @@ public class FlowStatusGenerator {
* @return true, if any jobs of the flow are RUNNING.
*/
public boolean isFlowRunning(String flowName, String flowGroup, long
flowExecutionId) {
- List<FlowStatus> flowStatusList = getLatestFlowStatus(flowName, flowGroup,
1, null);
+ List<FlowStatus> flowStatusList =
jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup);
+
if (flowStatusList == null || flowStatusList.isEmpty()) {
return false;
- } else {
- FlowStatus flowStatus = flowStatusList.get(0);
+ }
+ // Iterating through all flow statuses to check the condition
+ for (FlowStatus flowStatus : flowStatusList) {
ExecutionStatus flowExecutionStatus =
flowStatus.getFlowExecutionStatus();
- log.info("Comparing flow execution status with flowExecutionId: " +
flowStatus.getFlowExecutionId() + " and flowStatus: " + flowExecutionStatus + "
with incoming flowExecutionId: " + flowExecutionId);
- // If the latest flow status is the current job about to get kicked off,
we should ignore this check
- return flowStatus.getFlowExecutionId() != flowExecutionId &&
!FINISHED_STATUSES.contains(flowExecutionStatus.name());
+ // Check if it is not the current flowExecutionId and the status is not
in FINISHED_STATUSES
+ if (flowStatus.getFlowExecutionId() != flowExecutionId &&
!FINISHED_STATUSES.contains(flowExecutionStatus.name())) {
+ log.info("Comparing flow execution status with flowExecutionId: " +
flowStatus.getFlowExecutionId()
+ + " and flowStatus: " + flowExecutionStatus + " with incoming
flowExecutionId: " + flowExecutionId);
+ return true;
+ }
}
+ return false; // Return false if all flow statuses are in terminal status
}
/** @return only `jobStatuses` that represent a flow or, when `tag != null`,
represent a job tagged as `tag` */
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index bcbcc15fa..98889ac70 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.monitoring;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
@@ -81,6 +82,14 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
*/
public abstract List<FlowStatus>
getFlowStatusesForFlowGroupExecutions(String flowGroup, int
countJobStatusesPerFlowName);
+ /**
+ * Get all the {@link FlowStatus}es of executions of flows belonging to
this flow group and flowName. Currently, latest flow execution
+ * is decided by comparing {@link JobStatus#getFlowExecutionId()}.
+ * @return `FlowStatus`es are ordered by descending flowExecutionId.
+ **/
+ public abstract List<FlowStatus>
getAllFlowStatusesForFlowExecutionsOrdered(String flowGroup,String flowName);
+
+
public long getLatestExecutionIdForFlow(String flowName, String flowGroup) {
List<Long> lastKExecutionIds = getLatestExecutionIdsForFlow(flowName,
flowGroup, 1);
return lastKExecutionIds != null && !lastKExecutionIds.isEmpty() ?
lastKExecutionIds.get(0) : -1L;
@@ -202,9 +211,18 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
private final long flowExecutionId;
private final List<State> jobStates;
}
-
+ /**
+ * Groups job status states by flow execution IDs optionally limiting the
number of executions per flow name.
+ *
+ * @param flowGroup The group to which the flow executions belong.
+ * @param jobStatusStates List of job status states to process.
+ * @param maxCountPerFlowName Maximum number of executions to retain per
flow name.
+ * If null, all executions are returned
+ * @return List of FlowExecutionJobStateGrouping objects containing the
latest job states
+ * grouped by flow execution ID and sorted by flow name in ascending
order.
+ */
protected List<FlowExecutionJobStateGrouping>
groupByFlowExecutionAndRetainLatest(
- String flowGroup, List<State> jobStatusStates, int maxCountPerFlowName) {
+ String flowGroup, List<State> jobStatusStates, Integer
maxCountPerFlowName) {
Map<String, Map<Long, List<State>>> statesByFlowExecutionIdByName =
jobStatusStates.stream().collect(
Collectors.groupingBy(JobStatusRetriever::getFlowName,
Collectors.groupingBy(JobStatusRetriever::getFlowExecutionId)));
@@ -212,7 +230,15 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
String flowName = flowNameEntry.getKey();
Map<Long, List<State>> statesByFlowExecutionIdForName =
flowNameEntry.getValue();
- List<Long> executionIds =
Ordering.<Long>natural().greatestOf(statesByFlowExecutionIdForName.keySet(),
maxCountPerFlowName);
+ List<Long> executionIds;
+ if (maxCountPerFlowName != null) {
+ // If maxCountPerFlowName is specified, limit the number of executions
per flow name
+ executionIds =
Ordering.natural().greatestOf(statesByFlowExecutionIdForName.keySet(),
maxCountPerFlowName);
+ } else {
+ // If maxCountPerFlowName is not specified (null), return all
execution IDs sorted in descending order
+ executionIds = new
ArrayList<>(statesByFlowExecutionIdForName.keySet());
+ executionIds.sort(Comparator.reverseOrder());
+ }
return executionIds.stream().map(executionId ->
new FlowExecutionJobStateGrouping(flowGroup, flowName, executionId,
statesByFlowExecutionIdForName.get(executionId)));
}).collect(Collectors.toList());
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 109c71f40..9e84a1fa0 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -43,7 +43,7 @@ public class FlowStatusGeneratorTest {
String flowName = "testName";
String flowGroup = "testGroup";
long currFlowExecutionId = 1234L;
- when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup,
1)).thenReturn(null);
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup)).thenReturn(null);
FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
currFlowExecutionId));
@@ -55,13 +55,12 @@ public class FlowStatusGeneratorTest {
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
- when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup,
1)).thenReturn(
- Lists.newArrayList(flowExecutionId));
JobStatus jobStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
- when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(
- jobStatusIterator);
+ FlowStatus flowStatus = new
FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,ExecutionStatus.COMPILED);
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup)).thenReturn(
+ Lists.newArrayList(flowStatus));
FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
// Block the next execution if the prior one is in compiled as it's
considered still running
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId + 1));
@@ -78,8 +77,9 @@ public class FlowStatusGeneratorTest {
JobStatus jobStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
- when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(
- jobStatusIterator);
+ FlowStatus flowStatus = new
FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,ExecutionStatus.COMPILED);
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup)).thenReturn(
+ Lists.newArrayList(flowStatus));
FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
// If the flow is compiled but the flow execution status is the same as
the one about to be kicked off, do not consider it as running.
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId));
@@ -103,18 +103,20 @@ public class FlowStatusGeneratorTest {
.jobName(job2).eventName("FAILED").build();
JobStatus jobStatus3 =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(job3).eventName("CANCELLED").build();
- JobStatus flowStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+ JobStatus jobStatus4 =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("CANCELLED").build();
- Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus1,
jobStatus2, jobStatus3, flowStatus).iterator();
+ Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus1,
jobStatus2, jobStatus3, jobStatus4).iterator();
FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
- when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
+ FlowStatus flowStatus = new
FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusIterator));
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup)).thenReturn(Lists.newArrayList(flowStatus));
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId));
- flowStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+ jobStatus4 =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
- jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3,
flowStatus).iterator();
- when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
+ jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3,
jobStatus4).iterator();
+ flowStatus = new
FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusIterator));
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup)).thenReturn(Collections.singletonList(flowStatus));
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId+1));
}
@@ -175,6 +177,90 @@ public class FlowStatusGeneratorTest {
Arrays.asList(f0jsmDep2)));
}
+ @Test
+ public void testIsFlowRunning_NoFlowStatuses_ReturnsFalse() {
+ String flowName = "testName";
+ String flowGroup = "testGroup";
+ long flowExecutionId = 1234L;
+ JobStatusRetriever jobStatusRetriever =
Mockito.mock(JobStatusRetriever.class);
+ FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
+
+ // Mocking the retrieval of empty flowStatusList
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup))
+ .thenReturn(Collections.emptyList());
+
+ Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId));
+ }
+
+ @Test
+ public void testIsFlowRunning_AllFinishedFlowStatuses_ReturnsFalse() {
+ String flowName = "testName";
+ String flowGroup = "testGroup";
+ long flowExecutionId = 1234L;
+ JobStatusRetriever jobStatusRetriever =
Mockito.mock(JobStatusRetriever.class);
+ FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
+
+ // Mocking flowStatusList with all finished statuses
+ List<FlowStatus> flowStatusList = Arrays.asList(
+ createFlowStatus(flowName, flowGroup, flowExecutionId, "COMPLETE"),
+ createFlowStatus(flowName, flowGroup, flowExecutionId, "FAILED"),
+ createFlowStatus(flowName, flowGroup, flowExecutionId, "CANCELLED")
+ );
+
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup))
+ .thenReturn(flowStatusList);
+
+ Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId));
+ }
+
+ @Test
+ public void
testIsFlowRunning_FlowStatusNotMatchingFlowExecutionIdAndOneOfTheStatusIsRunning_ReturnsTrue()
{
+ String flowName = "testName";
+ String flowGroup = "testGroup";
+ long flowExecutionId = 1234L;
+ JobStatusRetriever jobStatusRetriever =
Mockito.mock(JobStatusRetriever.class);
+ FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
+
+ // Mocking flowStatusList with a running status and a different flow
execution id
+ List<FlowStatus> flowStatusList = Arrays.asList(
+ createFlowStatus(flowName, flowGroup, flowExecutionId+4, "COMPLETE"),
+ createFlowStatus(flowName, flowGroup, flowExecutionId+3, "COMPLETE"),
+ createFlowStatus(flowName, flowGroup, flowExecutionId+2, "RUNNING"),
+ createFlowStatus(flowName, flowGroup, flowExecutionId+1, "FAILED"),
+ createFlowStatus(flowName, flowGroup, flowExecutionId, "COMPLETE")
+
+ );
+
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup))
+ .thenReturn(flowStatusList);
+
+ Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId + 1));
+ }
+
+ @Test
+ public void
testIsFlowRunning_FlowStatusMatchingFlowExecutionId_ReturnsFalse() {
+ String flowName = "testName";
+ String flowGroup = "testGroup";
+ long flowExecutionId = 1234L;
+ JobStatusRetriever jobStatusRetriever =
Mockito.mock(JobStatusRetriever.class);
+ FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
+
+ // Mocking flowStatusList with a running status and the same flow
execution id
+ List<FlowStatus> flowStatusList = Collections.singletonList(
+ createFlowStatus(flowName, flowGroup, flowExecutionId, "RUNNING")
+ );
+
+
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowName,
flowGroup))
+ .thenReturn(flowStatusList);
+
+ Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId));
+ }
+
+ private FlowStatus createFlowStatus(String flowName, String flowGroup, long
flowExecutionId, String status) {
+ ExecutionStatus executionStatus = ExecutionStatus.valueOf(status);
+ return new FlowStatus(flowName, flowGroup, flowExecutionId, null,
executionStatus);
+ }
+
private FlowStatus createFlowStatus(String flowGroup, String flowName, long
flowExecutionId, List<JobStatus> jobStatuses) {
return new FlowStatus(flowName, flowGroup, flowExecutionId,
jobStatuses.iterator(),
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatuses.iterator()));
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index e27ce534e..f2aeaba2d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -131,6 +131,23 @@ public class FsJobStatusRetriever extends
JobStatusRetriever {
}
}
+ @Override
+ public List<FlowStatus> getAllFlowStatusesForFlowExecutionsOrdered(String
flowGroup, String flowName) {
+ Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
+ Preconditions.checkArgument(flowName != null, "flowName cannot be null");
+ try {
+ String storeNamePrefix =
KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName);
+ List<String> storeNamesForFlowGroup = stateStore.getStoreNames(storeName
-> storeName.startsWith(storeNamePrefix));
+ List<State> flowGroupExecutionsStates =
storeNamesForFlowGroup.stream().flatMap(CheckedExceptionFunction.wrapToUnchecked(storeName
->
+ stateStore.getAll(storeName).stream()
+ )).collect(Collectors.toList());
+ return asFlowStatuses(groupByFlowExecutionAndRetainLatest(flowGroup,
flowGroupExecutionsStates, null));
+ } catch (IOException | RuntimeException e) { // (latter likely wrapping
`IOException` originating within `wrapUnchecked`)
+ log.error(String.format("Exception encountered when listing files for
flow group: %s", flowGroup), e);
+ return ImmutableList.of();
+ }
+ }
+
/**
* @param flowName
* @param flowGroup
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
index 8a9346571..72561d39a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
@@ -90,11 +90,25 @@ public class LocalFsJobStatusRetriever extends
JobStatusRetriever {
String JOB_DONE_SUFFIX = ".done";
if (this.doesJobExist(flowName, flowGroup, flowExecutionId,
JOB_DONE_SUFFIX)) {
- jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
-
jobName(jobName).jobGroup(jobGroup).jobExecutionId(flowExecutionId).eventName(ExecutionStatus.COMPLETE.name()).build();
+ jobStatus = JobStatus.builder()
+ .flowName(flowName)
+ .flowGroup(flowGroup)
+ .flowExecutionId(flowExecutionId)
+ .jobName(jobName)
+ .jobGroup(jobGroup)
+ .jobExecutionId(flowExecutionId)
+ .eventName(ExecutionStatus.COMPLETE.name())
+ .build();
} else if (this.doesJobExist(flowName, flowGroup, flowExecutionId, "")) {
- jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
-
jobName(jobName).jobGroup(jobGroup).jobExecutionId(flowExecutionId).eventName(ExecutionStatus.PENDING.name()).build();
+ jobStatus = JobStatus.builder()
+ .flowName(flowName)
+ .flowGroup(flowGroup)
+ .flowExecutionId(flowExecutionId)
+ .jobName(jobName)
+ .jobGroup(jobGroup)
+ .jobExecutionId(flowExecutionId)
+ .eventName(ExecutionStatus.PENDING.name())
+ .build();
} else {
return Collections.emptyIterator();
}
@@ -131,6 +145,19 @@ public class LocalFsJobStatusRetriever extends
JobStatusRetriever {
throw new UnsupportedOperationException("Not yet implemented");
}
+ /**
+ * @param flowGroup
+ * @param flowName
+ * @return all the flow statuses for the given flowGroup and flowName.
+ */
+ @Override
+ public List<FlowStatus> getAllFlowStatusesForFlowExecutionsOrdered(String
flowGroup, String flowName) {
+ Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
+ Preconditions.checkArgument(flowName != null, "flowName cannot be null");
+
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+
public StateStore<State> getStateStore() {
// this jobstatus retriever does not have a state store
// only used in tests so this is okay
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index d323d710c..81d8776c5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -95,6 +95,14 @@ public class MysqlJobStatusRetriever extends
JobStatusRetriever {
return asFlowStatuses(groupByFlowExecutionAndRetainLatest(flowGroup,
jobStatusStates, countJobStatusesPerFlowName));
}
+ @Override
+ public List<FlowStatus> getAllFlowStatusesForFlowExecutionsOrdered(String
flowGroup, String flowName) {
+ String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup,
flowName);
+ List<State> jobStatusStates = timeOpAndWrapIOException(() ->
this.stateStore.getAllWithPrefix(storeName),
+ GET_LATEST_FLOW_GROUP_STATUS_METRIC);
+ return asFlowStatuses(groupByFlowExecutionAndRetainLatest(flowGroup,
jobStatusStates,null));
+ }
+
@Override
public List<Long> getLatestExecutionIdsForFlow(String flowName, String
flowGroup, int count) {
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup,
flowName);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index ca505f73e..1b8a496f9 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service.monitoring;
import java.io.IOException;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
@@ -44,9 +45,13 @@ public abstract class JobStatusRetrieverTest {
protected static final String FLOW_NAME = "myFlowName";
protected static final String FLOW_GROUP_ALT_A = "myFlowGroup-alt-A";
protected static final String FLOW_GROUP_ALT_B = "myFlowGroup-alt-B";
+ protected static final String FLOW_GROUP_ALT_C = "myFlowGroup-alt-C";
+ protected static final String FLOW_GROUP_ALT_D = "myFlowGroup-alt-D";
protected static final String FLOW_NAME_ALT_1 = "myFlowName-alt-1";
protected static final String FLOW_NAME_ALT_2 = "myFlowName-alt-2";
protected static final String FLOW_NAME_ALT_3 = "myFlowName-alt-3";
+ protected static final String FLOW_NAME_ALT_4 = "myFlowName-alt-4";
+ protected static final String FLOW_NAME_ALT_5 = "myFlowName-alt-5";
protected String jobGroup;
private static final String MY_JOB_GROUP = "myJobGroup";
protected static final String MY_JOB_NAME_1 = "myJobName1";
@@ -280,7 +285,7 @@ public abstract class JobStatusRetrieverTest {
}
@Test
- public void testGetFlowStatusesForFlowGroupExecutions() throws IOException {
+ public void testGetFlowStatusesForFlowGroupExecutions() throws Exception {
// a.) simplify to begin, in `FLOW_GROUP_ALT_A`, leaving out job-level
status
addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME, 101L,
JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_A, FLOW_NAME, 102L,
JobStatusRetriever.NA_KEY, ExecutionStatus.RUNNING.name());
@@ -332,6 +337,66 @@ public abstract class JobStatusRetrieverTest {
JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_1, 1111L,
ExecutionStatus.COMPLETE.name()),
JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_2, 1111L,
ExecutionStatus.ORCHESTRATED.name()))));
}
+ @Test
+ public void testAllFlowStatusesForFlowExecutionsOrdered() throws Exception {
+ // a.) simplify to begin, in `FLOW_GROUP_ALT_A`, leaving out job-level
status
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 101L,
JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 102L,
JobStatusRetriever.NA_KEY, ExecutionStatus.RUNNING.name());
+
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 111L,
JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
+
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 121L,
JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 122L,
JobStatusRetriever.NA_KEY, ExecutionStatus.COMPILED.name());
+
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 131L,
JobStatusRetriever.NA_KEY, ExecutionStatus.FAILED.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 132L,
JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 133L,
JobStatusRetriever.NA_KEY, ExecutionStatus.PENDING_RESUME.name());
+
+ // b.) include job-level status, in `FLOW_GROUP_ALT_B`
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 211L,
JobStatusRetriever.NA_KEY, ExecutionStatus.FAILED.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 211L,
MY_JOB_NAME_2, ExecutionStatus.ORCHESTRATED.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 231L,
JobStatusRetriever.NA_KEY, ExecutionStatus.COMPLETE.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 231L,
MY_JOB_NAME_1, ExecutionStatus.FAILED.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 231L,
MY_JOB_NAME_2, ExecutionStatus.COMPLETE.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 232L,
JobStatusRetriever.NA_KEY, ExecutionStatus.FAILED.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 233L,
JobStatusRetriever.NA_KEY, ExecutionStatus.ORCHESTRATED.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 233L,
MY_JOB_NAME_1, ExecutionStatus.COMPLETE.name());
+ addFlowIdJobStatusToStateStore(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5, 233L,
MY_JOB_NAME_2, ExecutionStatus.ORCHESTRATED.name());
+
+ List<FlowStatus> flowStatusesForGroupAltA =
this.jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(FLOW_GROUP_ALT_C,
FLOW_NAME_ALT_4);
+ Assert.assertEquals(flowStatusesForGroupAltA.size(), 8);
+
+ assertThat(flowStatusesForGroupAltA.get(0),
FlowStatusMatch.of(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 133L,
ExecutionStatus.PENDING_RESUME));
+ assertThat(flowStatusesForGroupAltA.get(1),
FlowStatusMatch.of(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 132L,
ExecutionStatus.COMPLETE));
+ assertThat(flowStatusesForGroupAltA.get(2),
FlowStatusMatch.of(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 131L,
ExecutionStatus.FAILED));
+
+ assertThat(flowStatusesForGroupAltA.get(3),
FlowStatusMatch.of(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 122L,
ExecutionStatus.COMPILED));
+
+ assertThat(flowStatusesForGroupAltA.get(4),
FlowStatusMatch.of(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 121L,
ExecutionStatus.COMPLETE));
+ assertThat(flowStatusesForGroupAltA.get(5),
FlowStatusMatch.of(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 111L,
ExecutionStatus.COMPILED));
+
+ assertThat(flowStatusesForGroupAltA.get(6),
FlowStatusMatch.of(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 102L,
ExecutionStatus.RUNNING));
+ assertThat(flowStatusesForGroupAltA.get(7),
FlowStatusMatch.of(FLOW_GROUP_ALT_C, FLOW_NAME_ALT_4, 101L,
ExecutionStatus.COMPILED));
+
+
+ List<FlowStatus> flowStatusesForGroupAltB =
this.jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(FLOW_GROUP_ALT_D,
FLOW_NAME_ALT_5);
+ Assert.assertEquals(flowStatusesForGroupAltB.size(), 4);
+
+ assertThat(flowStatusesForGroupAltB.get(0),
FlowStatusMatch.withDependentJobStatuses(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5,
233L, ExecutionStatus.ORCHESTRATED,
+ ImmutableList.of(
+ JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_1, 1111L,
ExecutionStatus.COMPLETE.name()),
+ JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_2, 1111L,
ExecutionStatus.ORCHESTRATED.name()))));
+
+ assertThat(flowStatusesForGroupAltB.get(1),
FlowStatusMatch.withDependentJobStatuses(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5,
232L, ExecutionStatus.FAILED,
+ Collections.emptyList()));
+ assertThat(flowStatusesForGroupAltB.get(2),
FlowStatusMatch.withDependentJobStatuses(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5,
231L, ExecutionStatus.COMPLETE,
+ ImmutableList.of(
+ JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_1, 1111L,
ExecutionStatus.FAILED.name()),
+ JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_2, 1111L,
ExecutionStatus.COMPLETE.name()))));
+ assertThat(flowStatusesForGroupAltB.get(3),
FlowStatusMatch.withDependentJobStatuses(FLOW_GROUP_ALT_D, FLOW_NAME_ALT_5,
211L, ExecutionStatus.FAILED,
+ ImmutableList.of(
+ JobStatusMatch.Dependent.of(MY_JOB_GROUP, MY_JOB_NAME_2, 1111L,
ExecutionStatus.ORCHESTRATED.name()))));
+ }
abstract void cleanUpDir() throws Exception;