This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2837df2d5 [GOBBLIN-2069] implement dag procs to enforce job start
deadline and dag completion deadline (#3950)
2837df2d5 is described below
commit 2837df2d53d860baf461bfb3d542d704663079a6
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed May 22 17:52:46 2024 -0700
[GOBBLIN-2069] implement dag procs to enforce job start deadline and dag
completion deadline (#3950)
* add EnforceStartDeadlineDagProc and EnforceFinishDeadlineDagProc dag procs
* fix tests
* address review comments
* add a todo as a workaround for a bug
---
.../gobblin/configuration/ConfigurationKeys.java | 2 +-
.../src/main/avro/DagActionStoreChangeEvent.avsc | 4 +-
gobblin-modules/gobblin-kafka-09/build.gradle | 2 +-
.../runtime/KafkaAvroJobStatusMonitorTest.java | 9 ++
.../service/monitoring/FlowStatusGenerator.java | 3 +-
gobblin-service/build.gradle | 2 +-
.../modules/core/GobblinServiceGuiceModule.java | 16 ++-
.../modules/core/GobblinServiceManager.java | 24 ++--
.../orchestration/DagActionReminderScheduler.java | 14 +-
.../modules/orchestration/DagActionStore.java | 22 +++-
.../orchestration/DagManagementTaskStreamImpl.java | 73 +++++++++--
.../service/modules/orchestration/DagManager.java | 5 +-
.../modules/orchestration/DagManagerUtils.java | 19 +--
.../modules/orchestration/DagProcFactory.java | 15 +++
.../modules/orchestration/DagProcessingEngine.java | 14 +-
.../modules/orchestration/DagTaskVisitor.java | 6 +-
.../modules/orchestration/FlowLaunchHandler.java | 9 +-
.../MysqlMultiActiveLeaseArbiter.java | 10 +-
.../modules/orchestration/proc/DagProcUtils.java | 75 +++++++++++
.../proc/EnforceFlowFinishDeadlineDagProc.java | 88 +++++++++++++
.../proc/EnforceJobStartDeadlineDagProc.java | 104 +++++++++++++++
.../modules/orchestration/proc/KillDagProc.java | 52 +-------
.../orchestration/proc/ReevaluateDagProc.java | 26 +++-
.../task/EnforceFlowFinishDeadlineDagTask.java | 39 ++++++
.../task/EnforceJobStartDeadlineDagTask.java | 39 ++++++
.../service/modules/spec/JobExecutionPlan.java | 28 ++++
.../DagManagementDagActionStoreChangeMonitor.java | 3 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 52 +++++++-
.../gobblin/service/GobblinServiceManagerTest.java | 12 +-
.../DagManagementTaskStreamImplTest.java | 2 +-
.../orchestration/DagProcessingEngineTest.java | 5 +-
.../proc/EnforceDeadlineDagProcsTest.java | 144 +++++++++++++++++++++
.../orchestration/proc/ReevaluateDagProcTest.java | 25 ++++
.../service/monitoring/JobStatusRetrieverTest.java | 2 +-
.../monitoring/MysqlJobStatusRetrieverTest.java | 4 +-
gradle/scripts/dependencyDefinitions.gradle | 1 +
36 files changed, 815 insertions(+), 135 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 93acece84..309a451f1 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1072,7 +1072,7 @@ public class ConfigurationKeys {
public static final String GOBBLIN_JOB_START_SLA_TIME =
"gobblin.job.start.sla.time";
public static final String GOBBLIN_JOB_START_SLA_TIME_UNIT =
"gobblin.job.start.sla.timeunit";
public static final long FALLBACK_GOBBLIN_JOB_START_SLA_TIME = 10L;
- public static final String FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT =
"MINUTES";
+ public static final String FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT =
TimeUnit.MINUTES.name();
public static final String DATASET_SUBPATHS_KEY =
"gobblin.flow.dataset.subPaths";
public static final String DATASET_BASE_INPUT_PATH_KEY =
"gobblin.flow.dataset.baseInputPath";
public static final String DATASET_BASE_OUTPUT_PATH_KEY =
"gobblin.flow.dataset.baseOutputPath";
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
index c6d94a628..84ca4cbc5 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
@@ -34,9 +34,11 @@
"type": "enum",
"name": "DagActionValue",
"symbols": [
+ "ENFORCE_FLOW_FINISH_DEADLINE",
+ "ENFORCE_JOB_START_DEADLINE",
"KILL",
- "RESUME",
"LAUNCH",
+ "RESUME",
"REEVALUATE"
],
"symbolDocs": {
diff --git a/gobblin-modules/gobblin-kafka-09/build.gradle
b/gobblin-modules/gobblin-kafka-09/build.gradle
index 9dd1604d9..58cfb3bbb 100644
--- a/gobblin-modules/gobblin-kafka-09/build.gradle
+++ b/gobblin-modules/gobblin-kafka-09/build.gradle
@@ -56,7 +56,7 @@ dependencies {
testCompile project(path: ":gobblin-metastore", configuration:
"testFixtures")
testCompile project(":gobblin-test-utils")
testCompile externalDependency.jsonAssert
- testCompile externalDependency.mockito
+ testCompile externalDependency.mockitoInline
testCompile externalDependency.testng
testCompile(externalDependency.kafka09Test){
exclude group: "com.sun.jmx", module: "jmxri"
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 647b5cfe1..1d1520d5d 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -31,6 +31,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -71,6 +73,9 @@ import
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueReposi
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.service.monitoring.GaaSJobObservabilityEventProducer;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
@@ -99,6 +104,7 @@ public class KafkaAvroJobStatusMonitorTest {
private MetricContext context;
private KafkaAvroEventKeyValueReporter.Builder<?> builder;
private MysqlDagActionStore mysqlDagActionStore;
+ private final MockedStatic<GobblinServiceManager>
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
@BeforeClass
public void setUp() throws Exception {
@@ -118,6 +124,8 @@ public class KafkaAvroJobStatusMonitorTest {
builder =
builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
this.mysqlDagActionStore = mock(MysqlDagActionStore.class);
+ this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(mock(DagActionReminderScheduler.class));
+ this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
}
@Test
@@ -762,6 +770,7 @@ public class KafkaAvroJobStatusMonitorTest {
public void tearDown() {
try {
this.kafkaTestHelper.close();
+ this.mockedGobblinServiceManager.close();
} catch(Exception e) {
System.err.println("Failed to close Kafka server.");
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 4755f8368..4307659b4 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -40,7 +40,8 @@ import org.apache.gobblin.service.ExecutionStatus;
*/
@Slf4j
public class FlowStatusGenerator {
- public static final List<String> FINISHED_STATUSES =
Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED");
+ public static final List<String> FINISHED_STATUSES =
Lists.newArrayList(ExecutionStatus.FAILED.name(),
+ ExecutionStatus.COMPLETE.name(), ExecutionStatus.CANCELLED.name());
public static final int MAX_LOOKBACK = 100;
private final JobStatusRetriever jobStatusRetriever;
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index af029ad5c..80b2d7e39 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -90,7 +90,7 @@ dependencies {
testRuntime externalDependency.derby
testCompile externalDependency.hamcrest
testCompile externalDependency.jhyde
- testCompile externalDependency.mockito
+ testCompile externalDependency.mockitoInline
testCompile externalDependency.testContainers
testCompile externalDependency.testContainersMysql
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index b4d58e35e..2f7066f90 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -39,12 +39,7 @@ import javax.inject.Singleton;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.restli.EmbeddedRestliServer;
-import
org.apache.gobblin.service.modules.orchestration.DagActionProcessingMultiActiveLeaseArbiterFactory;
-import
org.apache.gobblin.service.modules.orchestration.FlowLaunchMultiActiveLeaseArbiterFactory;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
-import
org.apache.gobblin.service.modules.orchestration.MultiActiveLeaseArbiter;
-import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
@@ -70,7 +65,9 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
import org.apache.gobblin.service.modules.db.ServiceDatabaseProvider;
import org.apache.gobblin.service.modules.db.ServiceDatabaseProviderImpl;
+import
org.apache.gobblin.service.modules.orchestration.DagActionProcessingMultiActiveLeaseArbiterFactory;
import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagement;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementTaskStreamImpl;
@@ -78,10 +75,14 @@ import
org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagProcFactory;
import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
import org.apache.gobblin.service.modules.orchestration.DagTaskStream;
-import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.FlowLaunchHandler;
+import
org.apache.gobblin.service.modules.orchestration.FlowLaunchMultiActiveLeaseArbiterFactory;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProcUtils;
import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandler;
import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
@@ -323,6 +324,9 @@ public class GobblinServiceGuiceModule implements Module {
binder.bind(D2Announcer.class).to(NoopD2Announcer.class);
+
binder.bindConstant().annotatedWith(Names.named(DagProcessingEngine.DEFAULT_JOB_START_DEADLINE_TIME_MS)).to(
+
DagProcUtils.getDefaultJobStartDeadline(serviceConfig.getInnerConfig()));
+
LOGGER.info("Bindings configured");
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 0b8a09843..827ac961e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -117,7 +117,6 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
public static final String SERVICE_EVENT_BUS_NAME =
"GobblinServiceManagerEventBus";
private static final Logger LOGGER =
LoggerFactory.getLogger(GobblinServiceManager.class);
- @Setter private static volatile GobblinServiceGuiceModule
GOBBLIN_SERVICE_GUICE_MODULE;
protected final ServiceBasedAppLauncher serviceLauncher;
private volatile boolean stopInProgress = false;
@@ -168,6 +167,7 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
@Inject
@Getter
private Injector injector;
+ @Getter @Setter private volatile static Injector staticInjector;
protected boolean flowCatalogLocalCommit;
@@ -257,27 +257,27 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
/**
* Uses the provided serviceConfiguration to create a new Guice module and
obtain a new class associated with it.
- * This method should only be called once per application.
*/
public static GobblinServiceManager create(GobblinServiceConfiguration
serviceConfiguration) {
- GOBBLIN_SERVICE_GUICE_MODULE = new
GobblinServiceGuiceModule(serviceConfiguration);
- return getClass(GobblinServiceManager.class);
+ if (GobblinServiceManager.staticInjector == null) {
+ GobblinServiceManager.staticInjector =
Guice.createInjector(Stage.DEVELOPMENT, new
GobblinServiceGuiceModule(serviceConfiguration));
+ }
+ return getClass(GobblinServiceManager.staticInjector,
GobblinServiceManager.class);
}
/**
- * If {@link GobblinServiceManager} is created using guice, user should set
{@link GobblinServiceManager#GOBBLIN_SERVICE_GUICE_MODULE}
- * for this method to work.
+ * This method assumes that {@link GobblinServiceManager} is created using
guice, which sets the injector.
+ * If it is created through other ways, caller should use {@link
GobblinServiceManager#create(GobblinServiceConfiguration)}
+ * or provide the {@link Injector} using {@link
GobblinServiceManager#getClass(Injector, Class)} method instead.
* @param classToGet
* @return a new object if the class type is not marked with @Singleton,
otherwise the same instance of the class
* @param <T>
*/
public static <T> T getClass(Class<T> classToGet) {
- if (GOBBLIN_SERVICE_GUICE_MODULE == null) {
- throw new RuntimeException(String.format("getClass called to obtain %s
without calling create method to "
- + "initialize GobblinServiceGuiceModule.", classToGet));
- }
- // Use development stage to enable more verbose error messages and runtime
checks
- Injector injector = Guice.createInjector(Stage.DEVELOPMENT,
GOBBLIN_SERVICE_GUICE_MODULE);
+ return getClass(getStaticInjector(), classToGet);
+ }
+
+ public static <T> T getClass(Injector injector, Class<T> classToGet) {
return injector.getInstance(classToGet);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index 909d19f37..588db58d6 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -33,6 +33,7 @@ import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
import javax.inject.Inject;
+import javax.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -44,6 +45,7 @@ import
org.apache.gobblin.service.modules.core.GobblinServiceManager;
* {#scheduleReminderJob} on a flow action that it failed to acquire a lease
on but has not yet completed. The reminder
* will fire once the previous lease owner's lease is expected to expire.
*/
+@Singleton
public class DagActionReminderScheduler {
public static final String DAG_ACTION_REMINDER_SCHEDULER_KEY =
"DagActionReminderScheduler";
private final Scheduler quartzScheduler;
@@ -86,18 +88,17 @@ public class DagActionReminderScheduler {
@Override
public void execute(JobExecutionContext context) {
// Get properties from the trigger to create a dagAction
- JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
+ JobDataMap jobDataMap = context.getMergedJobDataMap();
String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
String flowGroup =
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
- String flowId =
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ String flowExecutionId =
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
DagActionStore.DagActionType dagActionType =
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ",
flowName: " + flowName
- + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
+ + ", flowExecutionId: " + flowExecutionId + ", jobName: " + jobName
+ ", dagActionType: " + dagActionType + ")");
- DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
- dagActionType);
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
dagActionType, true);
try {
DagManagement dagManagement =
GobblinServiceManager.getClass(DagManagement.class);
@@ -141,10 +142,9 @@ public class DagActionReminderScheduler {
*/
public static Trigger createReminderJobTrigger(DagActionStore.DagAction
dagAction, long reminderDurationMillis,
Supplier<Long> getCurrentTimeMillis) {
- Trigger trigger = TriggerBuilder.newTrigger()
+ return TriggerBuilder.newTrigger()
.withIdentity(createDagActionReminderKey(dagAction),
dagAction.getFlowGroup())
.startAt(new Date(getCurrentTimeMillis.get() + reminderDurationMillis))
.build();
- return trigger;
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index a44f7b422..cdaf56c8a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -22,6 +22,7 @@ import java.sql.SQLException;
import java.util.Collection;
import lombok.Data;
+import lombok.RequiredArgsConstructor;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
@@ -30,21 +31,29 @@ import
org.apache.gobblin.service.modules.flowgraph.DagNodeId;
public interface DagActionStore {
public static final String NO_JOB_NAME_DEFAULT = "";
enum DagActionType {
+ CANCEL, // Invoked through DagManager if flow has been stuck in
Orchestrated state for a while
+ ENFORCE_JOB_START_DEADLINE, // Enforce job start deadline
+ ENFORCE_FLOW_FINISH_DEADLINE, // Enforce flow finish deadline
KILL, // Kill invoked through API call
- RESUME, // Resume flow invoked through API call
LAUNCH, // Launch new flow execution invoked adhoc or through scheduled
trigger
+ REEVALUATE, // Re-evaluate what needs to be done upon receipt of a final
job status
+ RESUME, // Resume flow invoked through API call
RETRY, // Invoked through DagManager for flows configured to allow retries
- CANCEL, // Invoked through DagManager if flow has been stuck in
Orchestrated state for a while
- REEVALUATE // Re-evaluate what needs to be done upon receipt of a final
job status
}
@Data
+ @RequiredArgsConstructor
class DagAction {
final String flowGroup;
final String flowName;
final String flowExecutionId;
final String jobName;
final DagActionType dagActionType;
+ final boolean isReminder;
+
+ public DagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName, DagActionType dagActionType) {
+ this(flowGroup, flowName, flowExecutionId, jobName, dagActionType,
false);
+ }
public static DagAction forFlow(String flowGroup, String flowName, String
flowExecutionId, DagActionType dagActionType) {
return new DagAction(flowGroup, flowName, flowExecutionId,
NO_JOB_NAME_DEFAULT, dagActionType);
@@ -69,6 +78,13 @@ public interface DagActionStore {
return new DagNodeId(this.flowGroup, this.flowName,
Long.parseLong(this.flowExecutionId), this.flowGroup, this.jobName);
}
+
+ /**
+ * Creates and returns a {@link DagManager.DagId} for this DagAction.
+ */
+ public DagManager.DagId getDagId() {
+ return new DagManager.DagId(this.flowGroup, this.flowName,
this.flowExecutionId);
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index e211906a6..1d32cde39 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -27,6 +27,7 @@ import org.quartz.SchedulerException;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import javax.inject.Named;
@@ -39,11 +40,15 @@ import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
@@ -77,15 +82,16 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
protected MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter;
protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
private final boolean isMultiActiveExecutionEnabled;
- @Inject
private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+ private final DagManagementStateStore dagManagementStateStore;
@Inject
public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore>
dagActionStore,
@Named(ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME)
MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter,
Optional<DagActionReminderScheduler> dagActionReminderScheduler,
- @Named(InjectionNames.MULTI_ACTIVE_EXECUTION_ENABLED) boolean
isMultiActiveExecutionEnabled) {
+ @Named(InjectionNames.MULTI_ACTIVE_EXECUTION_ENABLED) boolean
isMultiActiveExecutionEnabled,
+ DagManagementStateStore dagManagementStateStore) {
this.config = config;
if (!dagActionStore.isPresent()) {
/* DagActionStore is optional because there are other configurations
that do not require it and it's initialized
@@ -102,12 +108,13 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
this.isMultiActiveExecutionEnabled = isMultiActiveExecutionEnabled;
MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
this.eventSubmitter = new EventSubmitter.Builder(metricContext,
"org.apache.gobblin.service").build();
+ this.dagManagementStateStore = dagManagementStateStore;
}
@Override
public synchronized void addDagAction(DagActionStore.DagAction dagAction) {
// TODO: Used to track missing dag issue, remove later as needed
- log.info("Add dagAction{}", dagAction);
+ log.info("Add dagAction {}", dagAction);
if (!this.dagActionQueue.offer(dagAction)) {
throw new RuntimeException("Could not add dag action " + dagAction + "
to the queue");
@@ -124,9 +131,22 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
while (true) {
try {
DagActionStore.DagAction dagAction = this.dagActionQueue.take();
- LeaseAttemptStatus leaseAttemptStatus =
retrieveLeaseStatus(dagAction);
- if (leaseAttemptStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus) {
- return createDagTask(dagAction,
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
+ /* Create triggers for original (non-reminder) dag actions of type
ENFORCE_JOB_START_DEADLINE and ENFORCE_FLOW_FINISH_DEADLINE.
+ Reminder triggers are used to inform hosts once the job start
deadline and flow finish deadline are passed;
+ then only is lease arbitration done to enforce the deadline
violation and fail the job or flow if needed */
+ if (!dagAction.isReminder() && dagAction.dagActionType ==
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
+ createJobStartDeadlineTrigger(dagAction);
+ } else if (!dagAction.isReminder() && dagAction.dagActionType ==
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) {
+ createFlowFinishDeadlineTrigger(dagAction);
+ } else if (!dagAction.isReminder
+ || dagAction.dagActionType ==
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE
+ || dagAction.dagActionType ==
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE) {
+ // todo - fix bug of a reminder event getting a lease even when
the first attempt succeeded.
+ // for now, avoid processing reminder events if they are not for
deadline dag actions
+ LeaseAttemptStatus leaseAttemptStatus =
retrieveLeaseStatus(dagAction);
+ if (leaseAttemptStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus) {
+ return createDagTask(dagAction,
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
+ }
}
} catch (Exception e) {
//TODO: need to handle exceptions gracefully
@@ -135,6 +155,37 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
}
}
+ private void createJobStartDeadlineTrigger(DagActionStore.DagAction
dagAction) throws SchedulerException, IOException {
+ long timeOutForJobStart =
DagManagerUtils.getJobStartSla(this.dagManagementStateStore.getDag(
+ dagAction.getDagId()).get().getNodes().get(0),
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+ // todo - this timestamp is just an approximation, the real job submission
has happened in past, and that is when a
+ // ENFORCE_JOB_START_DEADLINE dag action was created; we are just
processing that dag action here
+ long jobSubmissionTime = System.currentTimeMillis();
+ long reminderDuration = jobSubmissionTime + timeOutForJobStart -
System.currentTimeMillis();
+
+ dagActionReminderScheduler.get().scheduleReminder(dagAction,
reminderDuration);
+ }
+
+ private void createFlowFinishDeadlineTrigger(DagActionStore.DagAction
dagAction) throws SchedulerException, IOException {
+ long timeOutForJobFinish;
+ Dag.DagNode<JobExecutionPlan> dagNode =
this.dagManagementStateStore.getDag(dagAction.getDagId()).get().getNodes().get(0);
+
+ try {
+ timeOutForJobFinish = DagManagerUtils.getFlowSLA(dagNode);
+ } catch (ConfigException e) {
+ log.warn("Flow SLA for flowGroup: {}, flowName: {} is given in invalid
format, using default SLA of {}",
+
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY),
+
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
+ DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
+ timeOutForJobFinish = DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS;
+ }
+
+ long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
+ long reminderDuration = flowStartTime + timeOutForJobFinish -
System.currentTimeMillis();
+
+ dagActionReminderScheduler.get().scheduleReminder(dagAction,
reminderDuration);
+ }
+
/**
* Returns a {@link LeaseAttemptStatus} associated with the
* `dagAction` by calling
@@ -148,7 +199,7 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
throws IOException, SchedulerException {
// TODO: need to handle reminder events and flag them
LeaseAttemptStatus leaseAttemptStatus =
this.dagActionProcessingLeaseArbiter
- .tryAcquireLease(dagAction, System.currentTimeMillis(), false, false);
+ .tryAcquireLease(dagAction, System.currentTimeMillis(),
dagAction.isReminder, false);
/* Schedule a reminder for the event unless the lease has been
completed to safeguard against the case where even
we, when we might become the lease owner still fail to complete
processing
*/
@@ -162,12 +213,16 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
DagActionStore.DagActionType dagActionType = dagAction.getDagActionType();
switch (dagActionType) {
+ case ENFORCE_FLOW_FINISH_DEADLINE:
+ return new EnforceFlowFinishDeadlineDagTask(dagAction,
leaseObtainedStatus, dagActionStore.get());
+ case ENFORCE_JOB_START_DEADLINE:
+ return new EnforceJobStartDeadlineDagTask(dagAction,
leaseObtainedStatus, dagActionStore.get());
+ case KILL:
+ return new KillDagTask(dagAction, leaseObtainedStatus,
dagActionStore.get());
case LAUNCH:
return new LaunchDagTask(dagAction, leaseObtainedStatus,
dagActionStore.get());
case REEVALUATE:
return new ReevaluateDagTask(dagAction, leaseObtainedStatus,
dagActionStore.get());
- case KILL:
- return new KillDagTask(dagAction, leaseObtainedStatus,
dagActionStore.get());
case RESUME:
return new ResumeDagTask(dagAction, leaseObtainedStatus,
dagActionStore.get());
default:
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index ce942cd7a..51a846cb8 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -138,8 +138,9 @@ public class DagManager extends AbstractIdleService {
public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
public static final String DAG_MANAGER_HEARTBEAT =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"dagManager.heartbeat-%s";
// Default job start SLA time if configured, measured in minutes. Default is
10 minutes
- private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
- private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
+ // todo - rename "sla" -> "deadline", and move them to DagProcUtils
+ public static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
+ public static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
private static final int INITIAL_HOUSEKEEPING_THREAD_DELAY = 2;
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index c4b62b5ec..e2fc6a179 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -96,7 +96,7 @@ public class DagManagerUtils {
return getFlowExecId(dagNode.getValue().getJobSpec());
}
- static long getFlowExecId(JobSpec jobSpec) {
+ public static long getFlowExecId(JobSpec jobSpec) {
return
jobSpec.getConfig().getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
}
@@ -287,15 +287,8 @@ public class DagManagerUtils {
dagNode.getValue().setCurrentGeneration(dagNode.getValue().getCurrentGeneration()
+ 1);
}
- /**
- * Flow start time is the same as the flow execution id which is the
timestamp flow request was received, unless it
- * is a resumed flow, in which case it is {@link
JobExecutionPlan#getFlowStartTime()}
- * @param dagNode dag node in context
- * @return flow start time
- */
- static long getFlowStartTime(DagNode<JobExecutionPlan> dagNode) {
- long flowStartTime = dagNode.getValue().getFlowStartTime();
- return flowStartTime == 0L ? getFlowExecId(dagNode) : flowStartTime;
+ public static long getFlowStartTime(DagNode<JobExecutionPlan> dagNode) {
+ return dagNode.getValue().getFlowStartTime();
}
/**
@@ -304,7 +297,7 @@ public class DagManagerUtils {
* @param dagNode dag node for which sla is to be retrieved
* @return sla if it is provided, DEFAULT_FLOW_SLA_MILLIS otherwise
*/
- static long getFlowSLA(DagNode<JobExecutionPlan> dagNode) {
+ public static long getFlowSLA(DagNode<JobExecutionPlan> dagNode) {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
TimeUnit slaTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
jobConfig, ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT,
ConfigurationKeys.DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT));
@@ -320,7 +313,7 @@ public class DagManagerUtils {
* @param dagNode dag node for which flow start sla is to be retrieved
* @return job start sla in ms
*/
- static long getJobStartSla(DagNode<JobExecutionPlan> dagNode, Long
defaultJobStartSla) {
+ public static long getJobStartSla(DagNode<JobExecutionPlan> dagNode, Long
defaultJobStartSla) {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
TimeUnit slaTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
jobConfig, ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
@@ -331,6 +324,8 @@ public class DagManagerUtils {
: defaultJobStartSla;
}
+
+
static int getDagQueueId(Dag<JobExecutionPlan> dag, int numThreads) {
return getDagQueueId(DagManagerUtils.getFlowExecId(dag), numThreads);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
index 0cc559541..51f3c6ad2 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
@@ -22,11 +22,15 @@ import com.google.inject.Singleton;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import
org.apache.gobblin.service.modules.orchestration.proc.EnforceFlowFinishDeadlineDagProc;
+import
org.apache.gobblin.service.modules.orchestration.proc.EnforceJobStartDeadlineDagProc;
import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
import org.apache.gobblin.service.modules.orchestration.proc.ReevaluateDagProc;
import org.apache.gobblin.service.modules.orchestration.proc.ResumeDagProc;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
@@ -51,6 +55,17 @@ public class DagProcFactory implements
DagTaskVisitor<DagProc> {
this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}
+ @Override
+ public DagProc meet(EnforceFlowFinishDeadlineDagTask
enforceFlowFinishDeadlineDagTask) {
+ return new
EnforceFlowFinishDeadlineDagProc(enforceFlowFinishDeadlineDagTask);
+ }
+
+ @Override
+ public DagProc meet(EnforceJobStartDeadlineDagTask
enforceJobStartDeadlineDagTask) {
+ return new EnforceJobStartDeadlineDagProc(enforceJobStartDeadlineDagTask);
+ }
+
+
@Override
public LaunchDagProc meet(LaunchDagTask launchDagTask) {
return new LaunchDagProc(launchDagTask,
this.flowCompilationValidationHelper);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
index 464abdea1..611628e3f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.google.inject.name.Named;
import com.typesafe.config.Config;
import lombok.AllArgsConstructor;
@@ -61,10 +62,12 @@ public class DagProcessingEngine extends
AbstractIdleService {
private final Optional<DagProcFactory> dagProcFactory;
private ScheduledExecutorService scheduledExecutorPool;
private static final Integer TERMINATION_TIMEOUT = 30;
+ public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS =
"defaultJobStartDeadlineTimeMillis";
+ @Getter static long defaultJobStartSlaTimeMillis;
@Inject
- public DagProcessingEngine(Config config, Optional<DagTaskStream>
dagTaskStream,
- Optional<DagProcFactory> dagProcFactory,
Optional<DagManagementStateStore> dagManagementStateStore) {
+ public DagProcessingEngine(Config config, Optional<DagTaskStream>
dagTaskStream, Optional<DagProcFactory> dagProcFactory,
+ Optional<DagManagementStateStore> dagManagementStateStore,
@Named(DEFAULT_JOB_START_DEADLINE_TIME_MS) long deadlineTimeMs) {
this.config = config;
this.dagProcFactory = dagProcFactory;
this.dagTaskStream = dagTaskStream;
@@ -77,6 +80,11 @@ public class DagProcessingEngine extends AbstractIdleService
{
this.dagManagementStateStore.isPresent() ? "present" : "MISSING"));
}
log.info("DagProcessingEngine initialized.");
+ setDefaultJobStartDeadlineTimeMs(deadlineTimeMs);
+ }
+
+ private static void setDefaultJobStartDeadlineTimeMs(long deadlineTimeMs) {
+ defaultJobStartSlaTimeMillis = deadlineTimeMs;
}
@Override
@@ -113,8 +121,8 @@ public class DagProcessingEngine extends
AbstractIdleService {
@Override
public void run() {
+ log.info("Starting DagProcEngineThread to process dag tasks. Thread id:
{}", threadID);
while (true) {
- log.info("Starting DagProcEngineThread to process dag tasks. Thread
id: {}", threadID);
DagTask dagTask = dagTaskStream.next(); // blocking call
if (dagTask == null) {
//todo - add a metrics to count the times dagTask was null
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
index 9c36c8c11..0d91b8977 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
@@ -17,6 +17,8 @@
package org.apache.gobblin.service.modules.orchestration;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
@@ -24,8 +26,10 @@ import
org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
public interface DagTaskVisitor<T> {
+ T meet(EnforceFlowFinishDeadlineDagTask enforceFlowFinishDeadlineDagTask);
+ T meet(EnforceJobStartDeadlineDagTask enforceJobStartDeadlineDagTask);
+ T meet(KillDagTask killDagTask);
T meet(LaunchDagTask launchDagTask);
T meet(ReevaluateDagTask reevaluateDagTask);
- T meet(KillDagTask killDagTask);
T meet(ResumeDagTask resumeDagTask);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index 1c37a404c..bb4eda416 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -50,6 +50,7 @@ import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProcUtils;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.util.ConfigUtils;
@@ -110,7 +111,7 @@ public class FlowLaunchHandler {
LeaseAttemptStatus leaseAttempt =
this.multiActiveLeaseArbiter.tryAcquireLease(
dagAction, eventTimeMillis, isReminderEvent,
adoptConsensusFlowExecutionId);
if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus
- && persistDagAction((LeaseAttemptStatus.LeaseObtainedStatus)
leaseAttempt)) {
+ && persistLaunchDagAction((LeaseAttemptStatus.LeaseObtainedStatus)
leaseAttempt)) {
log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ",
leaseAttempt.getConsensusDagAction(),
((LeaseAttemptStatus.LeaseObtainedStatus)
leaseAttempt).getEventTimeMillis());
} else { // when NOT successfully `persistDagAction`, set a reminder to
re-attempt handling (unless leasing finished)
@@ -136,9 +137,11 @@ public class FlowLaunchHandler {
* Called after obtaining a lease to both persist to the {@link
DagActionStore} and
* {@link
MultiActiveLeaseArbiter#recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus)}
*/
- private boolean persistDagAction(LeaseAttemptStatus.LeaseObtainedStatus
leaseStatus) {
+ private boolean
persistLaunchDagAction(LeaseAttemptStatus.LeaseObtainedStatus leaseStatus) {
+ DagActionStore.DagAction launchDagAction =
leaseStatus.getConsensusDagAction();
try {
- this.dagActionStore.addDagAction(leaseStatus.getConsensusDagAction());
+ this.dagActionStore.addDagAction(launchDagAction);
+ DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(launchDagAction);
this.numFlowsSubmitted.mark();
// after successfully persisting, close the lease
return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 2702a81d5..7543e20e3 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -89,11 +89,11 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
private final int lingerMillis;
private final long retentionPeriodMillis;
private String thisTableRetentionStatement;
- private String thisTableGetInfoStatement;
- private String thisTableGetInfoStatementForReminder;
- private String thisTableSelectAfterInsertStatement;
- private String thisTableAcquireLeaseIfMatchingAllStatement;
- private String thisTableAcquireLeaseIfFinishedStatement;
+ private final String thisTableGetInfoStatement;
+ private final String thisTableGetInfoStatementForReminder;
+ private final String thisTableSelectAfterInsertStatement;
+ private final String thisTableAcquireLeaseIfMatchingAllStatement;
+ private final String thisTableAcquireLeaseIfFinishedStatement;
/*
Notes:
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 9c87bb029..5396b0502 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -19,25 +19,35 @@ package
org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
/**
@@ -45,6 +55,7 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
*/
@Slf4j
public class DagProcUtils {
+ private static final DagActionStore dagActionStore =
GobblinServiceManager.getClass(DagActionStore.class);
/**
* - submits a {@link JobSpec} to a {@link SpecExecutor}
@@ -82,6 +93,9 @@ public class DagProcUtils {
// either successfully or unsuccessfully. To catch any exceptions in the
job submission, the DagManagerThread
// blocks (by calling Future#get()) until the submission is completed.
dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode));
+
+ sendEnforceJobStartDeadlineDagAction(dagNode);
+
Future<?> addSpecFuture = producer.addSpec(jobSpec);
// todo - we should add future.get() instead of the complete future into
the JobExecutionPlan
dagNode.getValue().setJobFuture(com.google.common.base.Optional.of(addSpecFuture));
@@ -111,4 +125,65 @@ public class DagProcUtils {
throw new RuntimeException(e);
}
}
+
+ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan>
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws
IOException {
+ Properties props = new Properties();
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
+ if
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
{
+ props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+ }
+
+ try {
+ if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+ Future future = dagNodeToCancel.getValue().getJobFuture().get();
+ String serializedFuture =
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+ props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
serializedFuture);
+ sendCancellationEvent(dagNodeToCancel.getValue());
+ } else {
+ log.warn("No Job future when canceling DAG node (hence, not sending
cancellation event) - {}",
+ dagNodeToCancel.getValue().getJobSpec().getUri());
+ }
+
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
props).get();
+ // todo - why was it not being cleaned up in DagManager?
+ dagManagementStateStore.deleteDagNodeState(dagId, dagNodeToCancel);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static void cancelDag(Dag<JobExecutionPlan> dag,
DagManagementStateStore dagManagementStateStore) throws IOException {
+ List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = dag.getNodes();
+ log.info("Found {} DagNodes to cancel (DagId {}).",
dagNodesToCancel.size(), DagManagerUtils.generateDagId(dag));
+
+ for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
+ DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
+ }
+ }
+
+ public static void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
+ jobExecutionPlan.setExecutionStatus(CANCELLED);
+ }
+
+ private static void
sendEnforceJobStartDeadlineDagAction(Dag.DagNode<JobExecutionPlan> dagNode)
+ throws IOException {
+ dagActionStore.addJobDagAction(dagNode.getValue().getFlowGroup(),
dagNode.getValue().getFlowName(),
+ String.valueOf(dagNode.getValue().getFlowExecutionId()),
dagNode.getValue().getJobName(),
+ DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
+ }
+
+ public static void
sendEnforceFlowFinishDeadlineDagAction(DagActionStore.DagAction launchDagAction)
+ throws IOException {
+ dagActionStore.addFlowDagAction(launchDagAction.getFlowGroup(),
launchDagAction.getFlowName(),
+ launchDagAction.getFlowExecutionId(),
DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
+ }
+
+ public static long getDefaultJobStartDeadline(Config config) {
+ TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(
+ config, DagManager.JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
+ return jobStartTimeUnit.toMillis(ConfigUtils.getLong(config,
DagManager.JOB_START_SLA_TIME,
+ ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
new file mode 100644
index 000000000..3b1ac2886
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation for {@link DagProc} that kills all the jobs if the dag
does not finish in
+ * {@link
org.apache.gobblin.configuration.ConfigurationKeys#GOBBLIN_FLOW_SLA_TIME} time.
+ */
+@Slf4j
+public class EnforceFlowFinishDeadlineDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+ public EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask
enforceFlowFinishDeadlineDagTask) {
+ super(enforceFlowFinishDeadlineDagTask);
+ }
+
+ @Override
+ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
+ throws IOException {
+ return dagManagementStateStore.getDag(getDagId());
+ }
+
+ @Override
+ protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ log.info("Request to enforce deadlines for dag {}", getDagId());
+
+ if (!dag.isPresent()) {
+ // todo - add a metric here
+ log.error("Did not find Dag with id {}, it might be already
cancelled/finished and thus cleaned up from the store.",
+ getDagId());
+ return;
+ }
+
+ enforceFlowFinishDeadline(dagManagementStateStore, dag);
+ }
+
+ private void enforceFlowFinishDeadline(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
+ long flowFinishDeadline = DagManagerUtils.getFlowSLA(dagNode);
+ long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
+
+ // note that this condition should be true because the triggered dag
action has waited enough before reaching here
+ if (System.currentTimeMillis() > flowStartTime + flowFinishDeadline) {
+ List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel =
dag.get().getNodes();
+ log.info("Found {} DagNodes to cancel (DagId {}).",
dagNodesToCancel.size(), getDagId());
+
+ for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
+ DagProcUtils.cancelDagNode(dagNodeToCancel, dagManagementStateStore);
+ }
+
+
dag.get().setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
+ dag.get().setMessage("Flow killed due to exceeding SLA of " +
flowFinishDeadline + " ms");
+ dagManagementStateStore.checkpointDag(dag.get());
+ } else {
+ log.error("EnforceFlowFinishDeadline dagAction received before due time.
flowStartTime {}, flowFinishDeadline {} ", flowStartTime, flowFinishDeadline);
+ }
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
new file mode 100644
index 000000000..8cf01be6b
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * An implementation for {@link DagProc} that marks the {@link Dag} as failed
and cancel the job if it does not start in
+ * {@link
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME}
time.
+ */
+@Slf4j
+public class EnforceJobStartDeadlineDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+ public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask
enforceJobStartDeadlineDagTask) {
+ super(enforceJobStartDeadlineDagTask);
+ }
+
+ @Override
+ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
+ throws IOException {
+ return dagManagementStateStore.getDag(getDagId());
+ }
+
+ @Override
+ protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ log.info("Request to enforce deadlines for dag {}", getDagId());
+
+ if (!dag.isPresent()) {
+ // todo - add a metric here
+ log.error("Did not find Dag with id {}, it might be already
cancelled/finished and thus cleaned up from the store.",
+ getDagId());
+ return;
+ }
+
+ enforceJobStartDeadline(dagManagementStateStore, dag);
+ }
+
+ private void enforceJobStartDeadline(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
+ dagNodeToCheckDeadline =
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
+ if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
+ // this should never happen; a job for which DEADLINE_ENFORCEMENT dag
action is created must have a dag node in store
+ // todo - add metrics
+ log.error("Dag node {} not found for EnforceJobStartDeadlineDagProc",
getDagNodeId());
+ return;
+ }
+
+ Dag.DagNode<JobExecutionPlan> dagNode =
dagNodeToCheckDeadline.getLeft().get();
+ long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode,
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+ Optional<org.apache.gobblin.service.monitoring.JobStatus> jobStatus =
dagNodeToCheckDeadline.getRight();
+ if (!jobStatus.isPresent()) {
+ log.error("Some job status should be present for dag node {} that this
EnforceJobStartDeadlineDagProc belongs.", getDagNodeId());
+ return;
+ }
+
+ ExecutionStatus executionStatus = valueOf(jobStatus.get().getEventName());
+ long jobOrchestratedTime = jobStatus.get().getOrchestratedTime();
+ // note that second condition should be true because the triggered dag
action has waited enough before reaching here
+ if (executionStatus == ORCHESTRATED && System.currentTimeMillis() >
jobOrchestratedTime + timeOutForJobStart) {
+ log.info("Job exceeded the job start deadline. Killing it now. Job - {},
jobOrchestratedTime - {}, timeOutForJobStart - {}",
+ DagManagerUtils.getJobName(dagNode), jobOrchestratedTime,
timeOutForJobStart);
+
dagManagementStateStore.getDagManagerMetrics().incrementCountsStartSlaExceeded(dagNode);
+ DagProcUtils.cancelDagNode(dagNode, dagManagementStateStore);
+
dag.get().setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
+ dag.get().setMessage("Flow killed because no update received for " +
timeOutForJobStart + " ms after orchestration");
+ dagManagementStateStore.checkpointDag(dag.get());
+ }
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
index 5f1c7bb97..15448d62b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
@@ -18,28 +18,17 @@
package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
-import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.Properties;
-import java.util.concurrent.Future;
-
-import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
-import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
-import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
-
/**
* An implementation for {@link DagProc} that kills all the nodes of a dag.
@@ -73,55 +62,18 @@ public class KillDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
dag.get().setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
dag.get().setMessage("Flow killed by request");
-
dagManagementStateStore.checkpointDag(dag.get());
if (this.shouldKillSpecificJob) {
Optional<Dag.DagNode<JobExecutionPlan>> dagNodeToCancel =
dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId).getLeft();
if (dagNodeToCancel.isPresent()) {
- cancelDagNode(dagNodeToCancel.get());
+ DagProcUtils.cancelDagNode(dagNodeToCancel.get(),
dagManagementStateStore);
} else {
// todo - add a metric here
log.error("Did not find Dag node with id {}, it might be already
cancelled/finished and thus cleaned up from the store.", getDagNodeId());
}
} else {
- List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel =
dag.get().getNodes();
- log.info("Found {} DagNodes to cancel (DagId {}).",
dagNodesToCancel.size(), getDagId());
-
- for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
- cancelDagNode(dagNodeToCancel);
- // todo - why was it not being cleaned up in DagManager?
- dagManagementStateStore.deleteDagNodeState(getDagId(),
dagNodeToCancel);
- }
- }
- }
-
- private void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel)
throws IOException {
- Properties props = new Properties();
- if
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
{
- props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
-
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+ DagProcUtils.cancelDag(dag.get(), dagManagementStateStore);
}
-
- try {
- if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
- Future future = dagNodeToCancel.getValue().getJobFuture().get();
- String serializedFuture =
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
- props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
serializedFuture);
- sendCancellationEvent(dagNodeToCancel.getValue());
- } else {
- log.warn("No Job future when canceling DAG node (hence, not sending
cancellation event) - {}",
- dagNodeToCancel.getValue().getJobSpec().getUri());
- }
-
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
props).get();
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- private void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
- Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
-
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
- jobExecutionPlan.setExecutionStatus(CANCELLED);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 6189a9b6e..10529b25e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -23,13 +23,17 @@ import java.util.Set;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.quartz.SchedulerException;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
@@ -39,7 +43,6 @@ import org.apache.gobblin.service.monitoring.JobStatus;
/**
- * suggest:
* A {@link DagProc} to launch any subsequent (dependent) job(s) once all
pre-requisite job(s) in the Dag have succeeded.
* When there are no more jobs to run and no more running, it cleans up the
Dag.
* (In future), if there are multiple new jobs to be launched, separate launch
dag actions are created for each of them.
@@ -76,8 +79,8 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
}
@Override
- protected void act(DagManagementStateStore dagManagementStateStore,
Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
dagNodeWithJobStatus)
- throws IOException {
+ protected void act(DagManagementStateStore dagManagementStateStore,
Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
+ Optional<JobStatus>> dagNodeWithJobStatus) throws IOException {
if (!dagNodeWithJobStatus.getLeft().isPresent()) {
// one of the reason this could arise is when the MALA leasing doesn't
work cleanly and another DagProc::process
// has cleaned up the Dag, yet did not complete the lease before this
current one acquired its own
@@ -115,6 +118,8 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
} else {
dagManagementStateStore.markDagFailed(dag);
}
+
+ removeFlowFinishDeadlineTriggerAndDagAction();
}
}
@@ -191,4 +196,19 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>>
nextNodes) {
throw new UnsupportedOperationException("More than one start job is not
allowed");
}
+
+ private void removeFlowFinishDeadlineTriggerAndDagAction() {
+ DagActionStore.DagAction enforceFlowFinishDeadlineDagAction =
DagActionStore.DagAction.forFlow(getDagNodeId().getFlowGroup(),
+ getDagNodeId().getFlowName(),
String.valueOf(getDagNodeId().getFlowExecutionId()),
+ DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE);
+ log.info("Deleting reminder trigger and dag action {}",
enforceFlowFinishDeadlineDagAction);
+ // todo - add metrics
+
+ try {
+
GobblinServiceManager.getClass(DagActionReminderScheduler.class).unscheduleReminderJob(getDagTask().getDagAction());
+
GobblinServiceManager.getClass(DagActionStore.class).deleteDagAction(enforceFlowFinishDeadlineDagAction);
+ } catch (SchedulerException | IOException e) {
+ log.warn("Failed to unschedule the reminder for {}",
enforceFlowFinishDeadlineDagAction);
+ }
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
new file mode 100644
index 000000000..5eaf4ffa7
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
+
+
+/**
+ * A {@link DagTask} responsible for killing jobs if they have not finished in
+ * {@link
org.apache.gobblin.configuration.ConfigurationKeys#GOBBLIN_FLOW_SLA_TIME} time.
+ */
+
+public class EnforceFlowFinishDeadlineDagTask extends DagTask {
+ public EnforceFlowFinishDeadlineDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
+ DagActionStore dagActionStore) {
+ super(dagAction, leaseObtainedStatus, dagActionStore);
+ }
+
+ public <T> T host(DagTaskVisitor<T> visitor) {
+ return visitor.meet(this);
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
new file mode 100644
index 000000000..7ad3867a5
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
+
+
+/**
+ * A {@link DagTask} responsible for killing jobs if they have not started in
+ * {@link
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME}.
+ */
+
+public class EnforceJobStartDeadlineDagTask extends DagTask {
+ public EnforceJobStartDeadlineDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
+ DagActionStore dagActionStore) {
+ super(dagAction, leaseObtainedStatus, dagActionStore);
+ }
+
+ public <T> T host(DagTaskVisitor<T> visitor) {
+ return visitor.meet(this);
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 97ca4fc45..7210c6725 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -239,6 +239,34 @@ public class JobExecutionPlan {
this.id = DagManagerUtils.calcJobId(jobSpec.getConfig());
}
+ /**
+ * Flow start time is the same as the flow execution id if it is not set,
which is also the timestamp flow request was
+ * received. It is set to a non-zero number when it is a resumed flow.
+ */
+ public long getFlowStartTime() {
+ return this.flowStartTime == 0L ?
DagManagerUtils.getFlowExecId(this.getJobSpec()) : flowStartTime;
+ }
+
+ public String getFlowGroup() {
+ return jobSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ }
+
+ public String getFlowName() {
+ return jobSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY);
+ }
+
+ public long getFlowExecutionId() {
+ return
jobSpec.getConfig().getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ }
+
+ public String getJobGroup() {
+ return jobSpec.getConfig().getString(ConfigurationKeys.JOB_GROUP_KEY);
+ }
+
+ public String getJobName() {
+ return jobSpec.getConfig().getString(ConfigurationKeys.JOB_NAME_KEY);
+ }
+
/**
* Render the JobSpec into a JSON string.
* @return a valid JSON string representation of the JobSpec.
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index c2065b7d5..eacecab94 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -58,8 +58,9 @@ public class DagManagementDagActionStoreChangeMonitor extends
DagActionStoreChan
dagAction.getDagActionType(), dagAction);
LaunchSubmissionMetricProxy launchSubmissionMetricProxy = isStartup ?
ON_STARTUP : POST_STARTUP;
try {
- // todo - add actions for other other type of dag actions
switch (dagAction.getDagActionType()) {
+ case ENFORCE_FLOW_FINISH_DEADLINE:
+ case ENFORCE_JOB_START_DEADLINE:
case KILL :
case LAUNCH :
case REEVALUATE :
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index c72502340..9d707ffdc 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.quartz.SchedulerException;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
@@ -63,6 +64,8 @@ import
org.apache.gobblin.runtime.troubleshooter.IssueEventBuilder;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
@@ -151,6 +154,12 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
this.eventProducer = observabilityEventProducer;
}
+ public enum NewState {
+ FINISHED,
+ RUNNING,
+ SAME_AS_PREVIOUS,
+ }
+
@Override
protected void startUp() {
super.startUp();
@@ -203,7 +212,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
try (Timer.Context context =
getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
- Pair<org.apache.gobblin.configuration.State, Boolean>
updatedJobStatus = recalcJobStatus(jobStatus, this.stateStore);
+ Pair<org.apache.gobblin.configuration.State, NewState>
updatedJobStatus = recalcJobStatus(jobStatus, this.stateStore);
jobStatus = updatedJobStatus.getLeft();
String flowName =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
@@ -214,13 +223,14 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
String storeName = jobStatusStoreName(flowGroup, flowName);
String tableName = jobStatusTableName(flowExecutionId, jobGroup,
jobName);
- if (updatedJobStatus.getRight()) {
+ if (updatedJobStatus.getRight() == NewState.FINISHED) {
this.eventProducer.emitObservabilityEvent(jobStatus);
-
if (this.dagProcEngineEnabled) {
// todo - retried/resumed jobs *may* not be handled here, we may
want to create their dag action elsewhere
this.dagActionStore.addJobDagAction(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
}
+ } else if (updatedJobStatus.getRight() == NewState.RUNNING) {
+ removeStartDeadlineTriggerAndDagAction(flowGroup, flowName,
flowExecutionId, jobName);
}
// update the state store after adding a dag action to guaranty
at-least-once adding of dag action
@@ -246,6 +256,20 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
}
+ private void removeStartDeadlineTriggerAndDagAction(String flowGroup, String
flowName, String flowExecutionId, String jobName) {
+ DagActionStore.DagAction enforceStartDeadlineDagAction = new
DagActionStore.DagAction(flowGroup, flowName,
+ String.valueOf(flowExecutionId), jobName,
DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE);
+ log.info("Deleting reminder trigger and dag action {}",
enforceStartDeadlineDagAction);
+ // todo - add metrics
+
+ try {
+
GobblinServiceManager.getClass(DagActionReminderScheduler.class).unscheduleReminderJob(enforceStartDeadlineDagAction);
+
GobblinServiceManager.getClass(DagActionStore.class).deleteDagAction(enforceStartDeadlineDagAction);
+ } catch (SchedulerException | IOException e) {
+ log.error("Failed to unschedule the reminder for {}",
enforceStartDeadlineDagAction);
+ }
+ }
+
/**
* It fills missing fields in job status and also merge the fields with the
existing job status in the state store.
* Merging is required because we do not want to lose the information sent
by other GobblinTrackingEvents.
@@ -254,7 +278,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
* @throws IOException
*/
@VisibleForTesting
- static Pair<org.apache.gobblin.configuration.State, Boolean>
recalcJobStatus(org.apache.gobblin.configuration.State jobStatus,
+ static Pair<org.apache.gobblin.configuration.State, NewState>
recalcJobStatus(org.apache.gobblin.configuration.State jobStatus,
StateStore<org.apache.gobblin.configuration.State> stateStore) throws
IOException {
try {
if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
@@ -304,7 +328,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
modifyStateIfRetryRequired(jobStatus);
- return ImmutablePair.of(jobStatus,
isNewStateTransitionToFinal(jobStatus, states));
+ return ImmutablePair.of(jobStatus, newState(jobStatus, states));
} catch (Exception e) {
log.warn("Meet exception when adding jobStatus to state store at "
+ e.getStackTrace()[0].getClassName() + "line number: " +
e.getStackTrace()[0].getLineNumber(), e);
@@ -312,6 +336,16 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
}
+ private static NewState newState(org.apache.gobblin.configuration.State
jobStatus, List<org.apache.gobblin.configuration.State> states) {
+ if (isNewStateTransitionToFinal(jobStatus, states)) {
+ return NewState.FINISHED;
+ } else if (isNewStateTransitionToRunning(jobStatus, states)) {
+ return NewState.RUNNING;
+ } else {
+ return NewState.SAME_AS_PREVIOUS;
+ }
+ }
+
private static boolean isFlowStatusAndPendingResume(String jobName, String
jobGroup, String currentStatus) {
return jobName != null && jobGroup != null &&
jobName.equals(JobStatusRetriever.NA_KEY) &&
jobGroup.equals(JobStatusRetriever.NA_KEY)
&& currentStatus.equals(ExecutionStatus.PENDING_RESUME.name());
@@ -340,6 +374,14 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
&&
!FlowStatusGenerator.FINISHED_STATUSES.contains(prevStates.get(prevStates.size()-1).getProp(JobStatusRetriever.EVENT_NAME_FIELD));
}
+ static boolean
isNewStateTransitionToRunning(org.apache.gobblin.configuration.State
currentState, List<org.apache.gobblin.configuration.State> prevStates) {
+ if (prevStates.isEmpty()) {
+ return
ExecutionStatus.RUNNING.name().equals(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
+ }
+ return currentState.contains(JobStatusRetriever.EVENT_NAME_FIELD) &&
ExecutionStatus.RUNNING.name().equals(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD))
+ &&
!ExecutionStatus.RUNNING.name().equals(prevStates.get(prevStates.size()-1).getProp(JobStatusRetriever.EVENT_NAME_FIELD));
+ }
+
/**
* Merge states based on precedence defined by {@link
#ORDERED_EXECUTION_STATUSES}.
* The state instance in the 1st argument reflects the more recent state of
a job
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 670e40f80..b9861ede3 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -47,6 +47,9 @@ import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Stage;
import com.linkedin.data.template.StringMap;
import com.linkedin.r2.transport.http.client.HttpClientFactory;
import com.linkedin.restli.client.RestLiResponseException;
@@ -58,6 +61,8 @@ import
org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.core.GobblinServiceConfiguration;
+import org.apache.gobblin.service.modules.core.GobblinServiceGuiceModule;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
import
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
@@ -218,8 +223,11 @@ public class GobblinServiceManagerTest {
public static GobblinServiceManager
createTestGobblinServiceManager(Properties serviceCoreProperties,
String serviceName, String serviceId, String serviceWorkDir) {
- GobblinServiceManager gobblinServiceManager =
GobblinServiceManager.create(serviceName, serviceId,
- ConfigUtils.propertiesToConfig(serviceCoreProperties), new
Path(serviceWorkDir));
+ Injector testInjector = Guice.createInjector(Stage.DEVELOPMENT, new
GobblinServiceGuiceModule(
+ new GobblinServiceConfiguration(serviceName, serviceId,
ConfigUtils.propertiesToConfig(serviceCoreProperties),
+ new Path(serviceWorkDir))));
+ GobblinServiceManager gobblinServiceManager =
GobblinServiceManager.getClass(testInjector, GobblinServiceManager.class);
+ gobblinServiceManager.setStaticInjector(testInjector);
DagManager spiedDagManager = spy(gobblinServiceManager.getDagManager());
doNothing().when(spiedDagManager).setActive(anyBoolean());
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index f8c0f8296..070d6a85b 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -79,7 +79,7 @@ public class DagManagementTaskStreamImplTest {
this.dagManagementTaskStream =
new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
mock(MultiActiveLeaseArbiter.class),
Optional.of(mock(DagActionReminderScheduler.class)),
- false);
+ false, mock(DagManagementStateStore.class));
this.dagProcFactory = new DagProcFactory(null);
this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
this.dagManagementTaskStream, this.dagProcFactory,
dagManagementStateStore, 0);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 9718222a1..d1f1e736d 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -91,7 +91,8 @@ public class DagProcessingEngineTest {
this.dagManagementStateStore.setTopologySpecMap(topologySpecMap);
this.dagManagementTaskStream =
new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
- mock(MultiActiveLeaseArbiter.class),
Optional.of(mock(DagActionReminderScheduler.class)), false);
+ mock(MultiActiveLeaseArbiter.class),
Optional.of(mock(DagActionReminderScheduler.class)), false,
+ this.dagManagementStateStore);
this.dagProcFactory = new DagProcFactory(null);
DagProcessingEngine.DagProcEngineThread dagProcEngineThread =
@@ -100,7 +101,7 @@ public class DagProcessingEngineTest {
this.dagTaskStream = spy(new MockedDagTaskStream());
DagProcessingEngine dagProcessingEngine =
new DagProcessingEngine(config, Optional.ofNullable(dagTaskStream),
Optional.ofNullable(this.dagProcFactory),
- Optional.ofNullable(dagManagementStateStore));
+ Optional.ofNullable(dagManagementStateStore), 100000L);
dagProcessingEngine.startAsync();
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
new file mode 100644
index 000000000..4f8f7b365
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+ private ITestMetastoreDatabase testMetastoreDatabase;
+ private final MockedStatic<GobblinServiceManager>
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testMetastoreDatabase.close();
+ this.mockedGobblinServiceManager.close();
+ }
+
+ /*
+ This test simulate submitting a dag with a very short job start deadline
that will definitely be breached,
+ resulting in the job requiring to be killed
+ */
+ @Test
+ public void enforceJobStartDeadlineTest() throws Exception {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long flowExecutionId = System.currentTimeMillis();
+ MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+ doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 5, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME,
ConfigValueFactory.fromAnyRef(1L)));
+ JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+ message("Test
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+ doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+ this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
+ dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
has not yet started running
+
+ EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new
EnforceJobStartDeadlineDagProc(
+ new EnforceJobStartDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+ "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null, mock(DagActionStore.class)));
+ enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
+
+ int expectedNumOfDeleteDagNodeStates = 1; // the one dag node
corresponding to the EnforceStartDeadlineDagProc
+ Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
+
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count());
+ }
+
+ /*
+ This test simulate submitting a dag with a very short flow finish deadline
that will definitely be breached,
+ resulting in the dag requiring to be killed
+ */
+ @Test
+ public void enforceFlowFinishDeadlineTest() throws Exception {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long flowExecutionId = System.currentTimeMillis();
+ MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+ doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+ int numOfDagNodes = 5;
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ numOfDagNodes, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME,
ConfigValueFactory.fromAnyRef(1L)));
+ JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+ message("Test
message").eventName(ExecutionStatus.RUNNING.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+ doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+ this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
+ dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
is in running state
+
+ EnforceFlowFinishDeadlineDagProc enforceFlowFinishDeadlineDagProc = new
EnforceFlowFinishDeadlineDagProc(
+ new EnforceFlowFinishDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+ "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null, mock(DagActionStore.class)));
+ enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore);
+
+ Assert.assertEquals(numOfDagNodes,
+
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count());
+ }
+}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index c9642d437..6a625e527 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -42,7 +43,9 @@ import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
@@ -66,16 +69,24 @@ public class ReevaluateDagProcTest {
private ITestMetastoreDatabase testMetastoreDatabase;
private DagManagementStateStore dagManagementStateStore;
+ private MockedStatic<GobblinServiceManager> mockedGobblinServiceManager;
+ private DagActionStore dagActionStore;
+ private DagActionReminderScheduler dagActionReminderScheduler;
@BeforeClass
public void setUpClass() throws Exception {
this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ this.mockedGobblinServiceManager =
Mockito.mockStatic(GobblinServiceManager.class);
}
@BeforeMethod
public void setUp() throws Exception {
this.dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
mockDMSSCommonBehavior(dagManagementStateStore);
+ this.dagActionStore = mock(DagActionStore.class);
+ this.dagActionReminderScheduler = mock(DagActionReminderScheduler.class);
+ this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(this.dagActionStore);
+ this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(this.dagActionReminderScheduler);
}
private void mockDMSSCommonBehavior(DagManagementStateStore
dagManagementStateStore) throws IOException, SpecNotFoundException {
@@ -88,6 +99,7 @@ public class ReevaluateDagProcTest {
@AfterClass(alwaysRun = true)
public void tearDownClass() throws Exception {
+ this.mockedGobblinServiceManager.close();
// `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
this.testMetastoreDatabase.close();
}
@@ -134,6 +146,13 @@ public class ReevaluateDagProcTest {
// current job's state is deleted
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count(), 1);
+
+
Assert.assertEquals(Mockito.mockingDetails(this.dagActionReminderScheduler).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1);
+
+ // when there is no more job to run in re-evaluate dag proc, it deletes
enforce_flow_finish_dag_action also
+
Assert.assertEquals(Mockito.mockingDetails(this.dagActionStore).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
}
// test when there does not exist a next job in the dag when the current
job's reevaluate dag action is processed
@@ -185,5 +204,11 @@ public class ReevaluateDagProcTest {
// dag is deleted because the only job in the dag is completed
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
.filter(a -> a.getMethod().getName().equals("deleteDag")).count(), 1);
+
+
Assert.assertEquals(Mockito.mockingDetails(this.dagActionReminderScheduler).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1);
+
+
Assert.assertEquals(Mockito.mockingDetails(this.dagActionStore).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index fb216910d..ca505f73e 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -97,7 +97,7 @@ public abstract class JobStatusRetrieverTest {
properties.setProperty(TimingEvent.JOB_ORCHESTRATED_TIME,
String.valueOf(endTime));
}
State jobStatus = new State(properties);
- Pair<State, Boolean> updatedJobStatus =
KafkaJobStatusMonitor.recalcJobStatus(jobStatus,
this.jobStatusRetriever.getStateStore());
+ Pair<State, KafkaJobStatusMonitor.NewState> updatedJobStatus =
KafkaJobStatusMonitor.recalcJobStatus(jobStatus,
this.jobStatusRetriever.getStateStore());
jobStatus = updatedJobStatus.getLeft();
this.jobStatusRetriever.getStateStore().put(
KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName),
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index 5a595d652..0b05beba2 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -141,7 +141,7 @@ public class MysqlJobStatusRetrieverTest extends
JobStatusRetrieverTest {
properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
jobGroup);
State jobStatus = new State(properties);
- Pair<State, Boolean> updatedJobStatus =
KafkaJobStatusMonitor.recalcJobStatus(jobStatus,
this.jobStatusRetriever.getStateStore());
+ Pair<State, KafkaJobStatusMonitor.NewState> updatedJobStatus =
KafkaJobStatusMonitor.recalcJobStatus(jobStatus,
this.jobStatusRetriever.getStateStore());
jobStatus = updatedJobStatus.getLeft();
this.jobStatusRetriever.getStateStore().put(
KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName),
@@ -172,7 +172,7 @@ public class MysqlJobStatusRetrieverTest extends
JobStatusRetrieverTest {
State jobStatus = new State(properties);
try {
- Pair<State, Boolean> updatedJobStatus =
KafkaJobStatusMonitor.recalcJobStatus(jobStatus,
this.jobStatusRetriever.getStateStore());
+ Pair<State, KafkaJobStatusMonitor.NewState> updatedJobStatus =
KafkaJobStatusMonitor.recalcJobStatus(jobStatus,
this.jobStatusRetriever.getStateStore());
jobStatus = updatedJobStatus.getLeft();
this.jobStatusRetriever.getStateStore().put(
KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName),
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index cecba15fd..d553c5edf 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -134,6 +134,7 @@ ext.externalDependency = [
// dependency directly to mockito-inline
//
https://github.com/mockito/mockito/wiki/What%27s-new-in-Mockito-2#unmockable
"mockito": "org.mockito:mockito-core:4.11.0",
+ "mockitoInline": "org.mockito:mockito-inline:4.11.0",
"salesforceWsc": "com.force.api:force-wsc:" + salesforceVersion,
"salesforcePartner": "com.force.api:force-partner-api:" +
salesforceVersion,
"scala": "org.scala-lang:scala-library:2.11.8",