This is an automated email from the ASF dual-hosted git repository.
kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f9b80aca9f [GOBBLIN-2151] ignore flows that are running beyond job
start and flow finish deadline when doing the concurrency check (#4050)
f9b80aca9f is described below
commit f9b80aca9fd010acfd81c04df835d1f68e442cec
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Sep 11 12:05:55 2024 -0700
[GOBBLIN-2151] ignore flows that are running beyond job start and flow
finish deadline when doing the concurrency check (#4050)
---
.../apache/gobblin/service/FlowConfigsV2Test.java | 3 +-
.../monitoring/FlowStatusGeneratorTest.java | 4 +-
.../orchestration/DagManagementStateStore.java | 6 +
.../MySqlDagManagementStateStore.java | 7 +
.../utils/FlowCompilationValidationHelper.java | 105 ++++++++++++--
.../monitoring/MysqlJobStatusRetriever.java | 2 +-
.../modules/orchestration/OrchestratorTest.java | 3 +-
.../utils/FlowCompilationValidationHelperTest.java | 160 ++++++++++++++++++++-
8 files changed, 272 insertions(+), 18 deletions(-)
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigsV2Test.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigsV2Test.java
index 01e9d77f8f..07b7c5070d 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigsV2Test.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigsV2Test.java
@@ -132,8 +132,7 @@ public class FlowConfigsV2Test {
binder.bind(GroupOwnershipService.class).toInstance(groupOwnershipService);
});
- _server = EmbeddedRestliServer.builder().resources(
-
Lists.newArrayList(FlowConfigsV2Resource.class)).injector(injector).build();
+ _server =
EmbeddedRestliServer.builder().resources(Lists.newArrayList(FlowConfigsV2Resource.class)).injector(injector).build();
_server.startAsync();
_server.awaitRunning();
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 00ad043df8..deeffe98d3 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -58,7 +58,7 @@ public class FlowStatusGeneratorTest {
JobStatus jobStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
- FlowStatus flowStatus = new
FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,ExecutionStatus.COMPILED);
+ FlowStatus flowStatus = new FlowStatus(flowName, flowGroup,
flowExecutionId, jobStatusIterator, ExecutionStatus.COMPILED);
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup,
flowName)).thenReturn(
Lists.newArrayList(flowStatus));
FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
@@ -77,7 +77,7 @@ public class FlowStatusGeneratorTest {
JobStatus jobStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
- FlowStatus flowStatus = new
FlowStatus(flowName,flowGroup,flowExecutionId,jobStatusIterator,ExecutionStatus.COMPILED);
+ FlowStatus flowStatus = new FlowStatus(flowName, flowGroup,
flowExecutionId, jobStatusIterator, ExecutionStatus.COMPILED);
when(jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup,
flowName)).thenReturn(
Lists.newArrayList(flowStatus));
FlowStatusGenerator flowStatusGenerator = new
FlowStatusGenerator(jobStatusRetriever);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index d936f0129b..fb7b23fdf0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.URI;
import java.sql.SQLException;
import java.util.Collection;
+import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -142,6 +143,11 @@ public interface DagManagementStateStore {
*/
Optional<JobStatus> getJobStatus(DagNodeId dagNodeId);
+ /**
+ * @return list of {@link org.apache.gobblin.service.monitoring.FlowStatus}
for the provided flow group and flow name.
+ */
+ List<org.apache.gobblin.service.monitoring.FlowStatus>
getAllFlowStatusesForFlow(String flowGroup, String flowName);
+
/**
* Check if an action exists in dagAction store by flow group, flow name,
flow execution id, and job name.
* @param flowGroup flow group for the dag action
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
index adfe5157d4..d10302000a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
@@ -21,6 +21,7 @@ import java.net.URI;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -43,6 +44,7 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatus;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
@@ -211,6 +213,11 @@ public class MySqlDagManagementStateStore implements
DagManagementStateStore {
}
}
+ @Override
+ public List<FlowStatus> getAllFlowStatusesForFlow(String flowGroup, String
flowName) {
+ return
this.jobStatusRetriever.getAllFlowStatusesForFlowExecutionsOrdered(flowGroup,
flowName);
+ }
+
@Override
public boolean existsJobDagAction(String flowGroup, String flowName, long
flowExecutionId, String jobName,
DagActionStore.DagActionType dagActionType) throws IOException {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 473ff3cf1a..3763658da0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -19,10 +19,12 @@ package org.apache.gobblin.service.modules.utils;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
+import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.reflect.ConstructorUtils;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -38,16 +40,23 @@ import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import org.apache.gobblin.service.modules.orchestration.DagUtils;
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatus;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
+import static org.apache.gobblin.service.ExecutionStatus.*;
+
/**
* Helper class with functionality meant to be re-used between the
LaunchDagProc and Orchestrator when launching
@@ -69,12 +78,12 @@ public class FlowCompilationValidationHelper {
private final SpecCompiler specCompiler;
private final UserQuotaManager quotaManager;
private final EventSubmitter eventSubmitter;
- private final FlowStatusGenerator flowStatusGenerator;
+ private final DagManagementStateStore dagManagementStateStore;
private final boolean isFlowConcurrencyEnabled;
@Inject
public FlowCompilationValidationHelper(Config config,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
- UserQuotaManager userQuotaManager, FlowStatusGenerator
flowStatusGenerator) {
+ UserQuotaManager userQuotaManager, DagManagementStateStore
dagManagementStateStore) {
try {
String specCompilerClassName = ConfigUtils.getString(config,
ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY,
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS);
@@ -88,7 +97,7 @@ public class FlowCompilationValidationHelper {
this.quotaManager = userQuotaManager;
MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(config),
this.specCompiler.getClass());
this.eventSubmitter = new EventSubmitter.Builder(metricContext,
"org.apache.gobblin.service").build();
- this.flowStatusGenerator = flowStatusGenerator;
+ this.dagManagementStateStore = dagManagementStateStore;
this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
}
@@ -156,9 +165,9 @@ public class FlowCompilationValidationHelper {
}
addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
- if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName,
allowConcurrentExecution,
-
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
{
- return Optional.fromNullable(jobExecutionPlanDag);
+ if (isExecutionPermitted(flowGroup, flowName,
+
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)),
allowConcurrentExecution)) {
+ return Optional.of(jobExecutionPlanDag);
} else {
log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ "concurrent executions are disabled for this flow.", flowGroup,
flowName);
@@ -167,7 +176,7 @@ public class FlowCompilationValidationHelper {
Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter());
if (!flowSpec.isScheduled()) {
// For ad-hoc flow, we might already increase quota, we need to
decrease here
- for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+ for (Dag.DagNode<JobExecutionPlan> dagNode :
jobExecutionPlanDag.getStartNodes()) {
quotaManager.releaseQuota(dagNode);
}
}
@@ -187,9 +196,85 @@ public class FlowCompilationValidationHelper {
* @param allowConcurrentExecution
* @return true if the {@link FlowSpec} allows concurrent executions or if
no other instance of the flow is currently RUNNING.
*/
- private boolean isExecutionPermitted(FlowStatusGenerator
flowStatusGenerator, String flowGroup, String flowName,
- boolean allowConcurrentExecution, long flowExecutionId) {
- return allowConcurrentExecution ||
!flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId);
+ private boolean isExecutionPermitted(String flowGroup, String flowName, long
flowExecutionId, boolean allowConcurrentExecution)
+ throws IOException {
+ return allowConcurrentExecution || !isPriorFlowExecutionRunning(flowGroup,
flowName, flowExecutionId, dagManagementStateStore);
+ }
+
+ /**
+ * Returns true if any previous execution for the flow determined by the
provided flowGroup, flowName, flowExecutionId is running.
+ * We ignore the execution that has the provided flowExecutionId so that if
first attempt of some LaunchDagProc fails
+ * to complete the lease after create a Dag and storing it, the second
attempt can continue the unfinished work without
+ * thinking that the flow is already running.
+ * We also ignore the flows that are running beyond the job start deadline
and flow finish deadline.
+ * If this method returns `false`, callers may start a flow and subsequent
calls to this method may return `true`.
+ */
+ @VisibleForTesting
+ static boolean isPriorFlowExecutionRunning(String flowGroup, String
flowName, long flowExecutionId, DagManagementStateStore dagManagementStateStore)
+ throws IOException {
+ List<FlowStatus> flowStatusList =
dagManagementStateStore.getAllFlowStatusesForFlow(flowGroup, flowName);
+
+ if (flowStatusList == null || flowStatusList.isEmpty()) {
+ return false;
+ }
+
+ for (FlowStatus flowStatus : flowStatusList) {
+ ExecutionStatus flowExecutionStatus =
flowStatus.getFlowExecutionStatus();
+ if (flowStatus.getFlowExecutionId() == flowExecutionId) {
+ // a duplicate call to this method indicate that the prior caller of
this method could not complete the required action,
+ // so we ignore any flow status for the current execution to give the
caller another chance to complete them
+ // but this should be rate, so lets log it
+ if (flowExecutionStatus == COMPILED) {
+ log.info("A previous execution with the same flowExecutionId found
{}. Previous execution may not be "
+ + "successfully submitted.", flowStatus);
+ } else if (flowExecutionStatus == RUNNING) {
+ log.error("A previous execution with the same flowExecutionId found
{}. This is a rare case of previous "
+ + "execution getting submitted but then LaunchDagProc failed to
complete the lease", flowStatus);
+ } else {
+ log.warn("A previous execution with the same flowExecutionId and an
unexpected status is found {}.", flowStatus);
+ }
+ continue;
+ }
+
+ log.debug("Verifying if {} is running...", flowStatus);
+
+ if
(FlowStatusGenerator.FINISHED_STATUSES.contains(flowExecutionStatus.name()) ||
flowExecutionStatus == $UNKNOWN) {
+ // ignore finished entries
+ // todo - make changes so `getAllFlowStatusesForFlow` never returns
$UNKNOWN flow status
+ } else if (flowExecutionStatus == COMPILED || flowExecutionStatus ==
PENDING
+ || flowExecutionStatus == PENDING_RESUME || flowExecutionStatus ==
RUNNING) {
+ // these are the only four non-terminal statuses that a flow can have.
jobs have two more non-terminal statuses
+ // ORCHESTRATED and PENDING_RETRY
+ Dag.DagId dagIdOfOldExecution = new Dag.DagId(flowGroup, flowName,
flowStatus.getFlowExecutionId());
+ java.util.Optional<Dag<JobExecutionPlan>> dag =
dagManagementStateStore.getDag(dagIdOfOldExecution);
+
+ if (!dag.isPresent()) {
+ log.error("Dag is finished and cleaned up, job status monitor
somehow did not receive/update the flow status. Ignoring it here...");
+ continue;
+ }
+
+ Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
+ long flowStartTime = DagUtils.getFlowStartTime(dagNode);
+ long jobStartDeadline =
+ DagUtils.getJobStartDeadline(dagNode,
DagProcessingEngine.getDefaultJobStartDeadlineTimeMillis());
+ long flowFinishDeadline = DagUtils.getFlowFinishDeadline(dagNode);
+ if ((flowExecutionStatus == COMPILED || flowExecutionStatus ==
PENDING)
+ && System.currentTimeMillis() < flowStartTime +
jobStartDeadline
+ || (flowExecutionStatus == RUNNING || flowExecutionStatus ==
PENDING_RESUME)
+ && System.currentTimeMillis() < flowStartTime +
flowFinishDeadline) {
+ log.info("{} is still running. Found a dag for this, flowStartTime
{}, jobStartDeadline {}, flowFinishDeadline {}",
+ flowStatus, flowStartTime, jobStartDeadline,
flowFinishDeadline);
+ return true;
+ } else {
+ log.warn("Dag {} is still running beyond deadline! flowStartTime
{}, jobStartDeadline {}, flowFinishDeadline {}",
+ dag, flowStartTime, jobStartDeadline, flowFinishDeadline);
+ }
+ } else {
+ log.error("Unknown status {}", flowExecutionStatus);
+ }
+ }
+
+ return false;
}
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index 81d8776c53..5678fef6aa 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -98,7 +98,7 @@ public class MysqlJobStatusRetriever extends
JobStatusRetriever {
@Override
public List<FlowStatus> getAllFlowStatusesForFlowExecutionsOrdered(String
flowGroup, String flowName) {
String storeName = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup,
flowName);
- List<State> jobStatusStates = timeOpAndWrapIOException(() ->
this.stateStore.getAllWithPrefix(storeName),
+ List<State> jobStatusStates = timeOpAndWrapIOException(() ->
this.stateStore.getAll(storeName),
GET_LATEST_FLOW_GROUP_STATUS_METRIC);
return asFlowStatuses(groupByFlowExecutionAndRetainLatest(flowGroup,
jobStatusStates,null));
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 9652e0faac..ee5f14cb87 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -59,7 +59,6 @@ import org.apache.gobblin.service.GobblinServiceManagerTest;
import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
-import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
@@ -124,7 +123,7 @@ public class OrchestratorTest {
SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
FlowCompilationValidationHelper flowCompilationValidationHelper = new
FlowCompilationValidationHelper(ConfigFactory.empty(),
- sharedFlowMetricsSingleton, mock(UserQuotaManager.class),
mock(FlowStatusGenerator.class));
+ sharedFlowMetricsSingleton, mock(UserQuotaManager.class),
dagManagementStateStore);
this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
this.topologyCatalog, Optional.of(logger),
mock(FlowLaunchHandler.class), sharedFlowMetricsSingleton,
dagManagementStateStore,
flowCompilationValidationHelper, mock(JobStatusRetriever.class));
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java
index 349b161ad8..c865cae14a 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java
@@ -17,17 +17,45 @@
package org.apache.gobblin.service.modules.utils;
+import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
import org.apache.gobblin.service.modules.orchestration.DagTestUtils;
+import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProcTest;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatus;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
/**
@@ -36,12 +64,19 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
public class FlowCompilationValidationHelperTest {
private final Long jobSpecFlowExecutionId = 1234L;
private Dag<JobExecutionPlan> jobExecutionPlanDag;
+ private MySqlDagManagementStateStore dagManagementStateStore;
@BeforeClass
- public void setup() throws URISyntaxException {
+ public void setup() throws Exception {
String dagId = "testDag";
jobExecutionPlanDag = DagTestUtils.buildDag(dagId,
jobSpecFlowExecutionId);
+ }
+ @BeforeMethod
+ public void resetDMSS() throws Exception {
+ ITestMetastoreDatabase testMetastoreDatabase =
TestMetastoreDatabaseFactory.get();
+ this.dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(testMetastoreDatabase));
+ LaunchDagProcTest.mockDMSSCommonBehavior(this.dagManagementStateStore);
}
/*
@@ -66,4 +101,127 @@ public class FlowCompilationValidationHelperTest {
Assert.assertEquals(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD),
existingFlowExecutionId);
}
+
+ @Test
+ public void testConcurrentFlowPreviousFlowWithNonTerminalStatusWithNoDag()
throws IOException {
+ List<FlowStatus> list = new ArrayList<>();
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long previousFlowExecutionId = 12345L;
+ JobStatus jobStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(previousFlowExecutionId)
+
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
+ Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
+ list.add(new FlowStatus(flowName, flowGroup, previousFlowExecutionId,
jobStatusIterator, ExecutionStatus.COMPILED));
+ when(this.dagManagementStateStore.getAllFlowStatusesForFlow(anyString(),
anyString())).thenReturn(list);
+
+
Assert.assertFalse(FlowCompilationValidationHelper.isPriorFlowExecutionRunning(flowGroup,
flowName,
+ previousFlowExecutionId, this.dagManagementStateStore));
+ }
+
+ @Test
+ public void
testConcurrentFlowPreviousExecutionWithNonTerminalStatusRunningBeyondJobStartDeadline()
+ throws IOException, URISyntaxException {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long jobStartDeadline = 10L;
+ // extra minus 1 because sometimes assertion reach within a millisecond
and makes the flow running within the deadline
+ long flowStartTime = System.currentTimeMillis() - jobStartDeadline - 1;
+ long currentFlowExecutionId = System.currentTimeMillis() ;
+
+ insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime,
ExecutionStatus.PENDING,
+ ConfigFactory.empty()
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(jobStartDeadline)));
+
+
Assert.assertFalse(FlowCompilationValidationHelper.isPriorFlowExecutionRunning(flowGroup,
flowName,
+ currentFlowExecutionId, this.dagManagementStateStore));
+ }
+
+ @Test
+ public void
testConcurrentFlowPreviousExecutionWithNonTerminalStatusRunningBeyondFlowFinishDeadline()
+ throws IOException, URISyntaxException {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long flowFinishDeadline = 30L;
+ long flowStartTime = System.currentTimeMillis() - flowFinishDeadline - 1;
+ long currentFlowExecutionId = System.currentTimeMillis() ;
+
+ insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime,
ExecutionStatus.PENDING_RESUME,
+ ConfigFactory.empty()
+
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(flowFinishDeadline)));
+
+
Assert.assertFalse(FlowCompilationValidationHelper.isPriorFlowExecutionRunning(flowGroup,
flowName,
+ currentFlowExecutionId, this.dagManagementStateStore));
+ }
+
+ @Test
+ public void
testConcurrentFlowPreviousExecutionWithNonTerminalStatusRunningWithinFlowFinishDeadline()
+ throws IOException, URISyntaxException {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long flowFinishDeadline = 10000L;
+ long flowStartTime = System.currentTimeMillis() - 1 ; // giving test
flowFinishDeadline + 1 ms to finish
+ long currentFlowExecutionId = System.currentTimeMillis() ;
+
+ insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime,
ExecutionStatus.RUNNING,
+ ConfigFactory.empty()
+
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(flowFinishDeadline)));
+
+
Assert.assertTrue(FlowCompilationValidationHelper.isPriorFlowExecutionRunning(flowGroup,
flowName,
+ currentFlowExecutionId, this.dagManagementStateStore));
+ }
+
+ @Test
+ public void testConcurrentFlowNoPreviousExecutionRunning() throws
IOException, URISyntaxException {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long flowStartTime = System.currentTimeMillis(); // giving test
flowFinishDeadline to finish
+ insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime,
ExecutionStatus.PENDING,
+ ConfigFactory.empty()
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(flowStartTime)));
+
+ // change the mock to not return any previous flow status
+ when(this.dagManagementStateStore.getAllFlowStatusesForFlow(anyString(),
anyString())).thenReturn(Collections.emptyList());
+
+
Assert.assertFalse(FlowCompilationValidationHelper.isPriorFlowExecutionRunning(flowGroup,
flowName,
+ flowStartTime, this.dagManagementStateStore));
+ }
+
+ @Test
+ public void testSameFlowExecAlreadyCompiledWithinJobStartDeadline() throws
IOException, URISyntaxException {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long jobStartDeadline = 10000L;
+ long flowStartTime = System.currentTimeMillis();
+
+ insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime,
ExecutionStatus.COMPILED,
+ ConfigFactory.empty()
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(jobStartDeadline)));
+
+ // flowStartTime = currentFlowExecutionId
+
Assert.assertFalse(FlowCompilationValidationHelper.isPriorFlowExecutionRunning(flowGroup,
flowName,
+ flowStartTime, this.dagManagementStateStore));
+ }
+
+ private void insertFlowIntoDMSSMock(String flowGroup, String flowName, long
flowStartTime, ExecutionStatus executionStatus, Config config)
+ throws URISyntaxException, IOException {
+ List<FlowStatus> list = new ArrayList<>();
+ JobStatus jobStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowStartTime)
+
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(executionStatus.name()).build();
+ Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
+ list.add(new FlowStatus(flowName, flowGroup, flowStartTime,
jobStatusIterator, executionStatus));
+ when(this.dagManagementStateStore.getAllFlowStatusesForFlow(anyString(),
anyString())).thenReturn(list);
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowStartTime,
+ DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(), 5,
"user5", config
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ dag.getNodes().forEach(node ->
node.getValue().setFlowStartTime(flowStartTime));
+ this.dagManagementStateStore.addDag(dag);
+ }
}