This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3e5756234 [GOBBLIN-1999] Ignore concurrent check if the flow execution
ID is the same as the c… (#3874)
3e5756234 is described below
commit 3e575623413bf83883532478b5b8b70937d92ed9
Author: William Lo <[email protected]>
AuthorDate: Fri Feb 2 17:57:54 2024 -0500
[GOBBLIN-1999] Ignore concurrent check if the flow execution ID is the same
as the c… (#3874)
* Ignore concurrent check if the flow execution ID is the same as the
currently running flow execution ID to handle race condition of concurrent
hosts misreporting status
* cleanup
---
.../service/monitoring/FlowStatusGenerator.java | 6 ++-
.../monitoring/FlowStatusGeneratorTest.java | 54 ++++++++++++++++++----
.../modules/orchestration/Orchestrator.java | 2 +-
.../utils/FlowCompilationValidationHelper.java | 20 ++++----
4 files changed, 63 insertions(+), 19 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 0bf8b71ea..775fbeb4d 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -152,13 +152,17 @@ public class FlowStatusGenerator {
* @param flowGroup
* @return true, if any jobs of the flow are RUNNING.
*/
- public boolean isFlowRunning(String flowName, String flowGroup) {
+ public boolean isFlowRunning(String flowName, String flowGroup, long
flowExecutionId) {
List<FlowStatus> flowStatusList = getLatestFlowStatus(flowName, flowGroup,
1, null);
if (flowStatusList == null || flowStatusList.isEmpty()) {
return false;
} else {
FlowStatus flowStatus = flowStatusList.get(0);
ExecutionStatus flowExecutionStatus =
flowStatus.getFlowExecutionStatus();
+ // If the latest flow status is the current job about to get kicked off,
we should ignore this check
+ if (flowStatus.getFlowExecutionId() == flowExecutionId) {
+ return false;
+ }
return !FINISHED_STATUSES.contains(flowExecutionStatus.name());
}
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 380177936..601e40e5f 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -38,25 +38,61 @@ import static org.mockito.Mockito.when;
public class FlowStatusGeneratorTest {
@Test
- public void testIsFlowRunning() {
+ public void testIsFlowRunningFirstExecution() {
JobStatusRetriever jobStatusRetriever =
Mockito.mock(JobStatusRetriever.class);
String flowName = "testName";
String flowGroup = "testGroup";
+ long currFlowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup,
1)).thenReturn(null);
FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
- Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+ Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
currFlowExecutionId));
+ }
- //If a flow is COMPILED, isFlowRunning() should return true.
+ @Test
+ public void testIsFlowRunningCompiledPastExecution() {
+ JobStatusRetriever jobStatusRetriever =
Mockito.mock(JobStatusRetriever.class);
+ String flowName = "testName";
+ String flowGroup = "testGroup";
long flowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup,
1)).thenReturn(
Lists.newArrayList(flowExecutionId));
JobStatus jobStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
- when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
- Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+ when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(
+ jobStatusIterator);
+ FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
+ // Block the next execution if the prior one is in compiled as it's
considered still running
+ Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId + 1));
+ }
+
+ @Test
+ public void skipFlowConcurrentCheckSameFlowExecutionId() {
+ JobStatusRetriever jobStatusRetriever =
Mockito.mock(JobStatusRetriever.class);
+ String flowName = "testName";
+ String flowGroup = "testGroup";
+ long flowExecutionId = 1234L;
+ when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup,
1)).thenReturn(
+ Lists.newArrayList(flowExecutionId));
+ JobStatus jobStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
+ Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
+ when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(
+ jobStatusIterator);
+ FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
+ // If the flow is compiled but the flow execution status is the same as
the one about to be kicked off, do not consider it as running.
+ Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId));
+ }
+ @Test
+ public void testIsFlowRunningJobExecutionIgnored() {
+ String flowName = "testName";
+ String flowGroup = "testGroup";
+ long flowExecutionId = 1234L;
+ JobStatusRetriever jobStatusRetriever =
Mockito.mock(JobStatusRetriever.class);
+ when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup,
1)).thenReturn(
+ Lists.newArrayList(flowExecutionId));
//JobStatuses should be ignored, only the flow level status matters.
String job1 = "job1";
String job2 = "job2";
@@ -69,15 +105,17 @@ public class FlowStatusGeneratorTest {
.jobName(job3).eventName("CANCELLED").build();
JobStatus flowStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("CANCELLED").build();
- jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3,
flowStatus).iterator();
+ Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus1,
jobStatus2, jobStatus3, flowStatus).iterator();
+ FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
+
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
- Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+ Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId));
flowStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3,
flowStatus).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
- Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+ Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup,
flowExecutionId+1));
}
@Test
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 809779f10..618d8b050 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -247,7 +247,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
TimingEvent flowCompilationTimer = new
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
Optional<Dag<JobExecutionPlan>> compiledDagOptional =
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
flowSpec, flowGroup,
- flowName);
+ flowName, flowMetadata);
if (!compiledDagOptional.isPresent()) {
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index cda69ebd9..7400602f7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -76,9 +76,9 @@ public final class FlowCompilationValidationHelper {
specCompiler.awaitHealthy();
TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter,
TimingEvent.FlowTimings.FLOW_COMPILED);
- Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
- validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup,
flowName);
Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+ Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
+ validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup,
flowName, flowMetadata);
if (!jobExecutionPlanDagOptional.isPresent()) {
return Optional.absent();
@@ -89,7 +89,6 @@ public final class FlowCompilationValidationHelper {
return Optional.absent();
}
- addFlowExecutionIdIfAbsent(flowMetadata,
jobExecutionPlanDagOptional.get());
flowCompilationTimer.stop(flowMetadata);
return jobExecutionPlanDagOptional;
}
@@ -101,13 +100,18 @@ public final class FlowCompilationValidationHelper {
* @throws IOException
*/
public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Config flowConfig, FlowSpec flowSpec,
- String flowGroup, String flowName) throws IOException {
+ String flowGroup, String flowName, Map<String,String> flowMetadata)
throws IOException {
boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
isFlowConcurrencyEnabled);
Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+ if (jobExecutionPlanDag.isEmpty()) {
+ return Optional.absent();
+ }
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
- if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup,
allowConcurrentExecution)) {
+ if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup,
allowConcurrentExecution,
+
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
{
return Optional.fromNullable(jobExecutionPlanDag);
} else {
log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
@@ -121,9 +125,7 @@ public final class FlowCompilationValidationHelper {
quotaManager.releaseQuota(dagNode);
}
}
-
// Send FLOW_FAILED event
- Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because
another instance is running and concurrent "
+ "executions are disabled. Set flow.allowConcurrentExecution to
true in the flowSpec to change this behaviour.");
new TimingEvent(eventSubmitter,
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
@@ -140,8 +142,8 @@ public final class FlowCompilationValidationHelper {
* @return true if the {@link FlowSpec} allows concurrent executions or if
no other instance of the flow is currently RUNNING.
*/
private boolean isExecutionPermitted(FlowStatusGenerator
flowStatusGenerator, String flowName, String flowGroup,
- boolean allowConcurrentExecution) {
- return allowConcurrentExecution ||
!flowStatusGenerator.isFlowRunning(flowName, flowGroup);
+ boolean allowConcurrentExecution, long flowExecutionId) {
+ return allowConcurrentExecution ||
!flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId);
}
/**