umustafi commented on code in PR #3950:
URL: https://github.com/apache/gobblin/pull/3950#discussion_r1610441614
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -135,6 +151,37 @@ public DagTask next() {
}
}
+ private void createJobStartDeadlineTrigger(DagActionStore.DagAction
dagAction) throws SchedulerException, IOException {
+ long timeOutForJobStart =
DagManagerUtils.getJobStartSla(this.dagManagementStateStore.getDag(
+ dagAction.getDagId()).get().getNodes().get(0),
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+ // todo - this timestamp is just an approximation, the real job submission
has happened in past, and that is when a
Review Comment:
is this approximation okay for us or you plan on updating this in the future
to use the actual SLA deadline from when job submission occurred?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation for {@link DagProc} that kills all the jobs if the dag
does not finish in
+ * {@link
org.apache.gobblin.configuration.ConfigurationKeys#GOBBLIN_FLOW_SLA_TIME} time.
+ */
+@Slf4j
+public class EnforceFlowFinishDeadlineDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+ public EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask
enforceFlowFinishDeadlineDagTask) {
+ super(enforceFlowFinishDeadlineDagTask);
+ }
+
+ @Override
+ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
+ throws IOException {
+ return dagManagementStateStore.getDag(getDagId());
+ }
+
+ @Override
+ protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ log.info("Request to enforce deadlines for dag {}", getDagId());
+
+ if (!dag.isPresent()) {
+ // todo - add a metric here
+ log.error("Did not find Dag with id {}, it might be already
cancelled/finished and thus cleaned up from the store.",
+ getDagId());
+ return;
+ }
+
+ enforceFlowFinishDeadline(dagManagementStateStore, dag);
+ }
+
+ private void enforceFlowFinishDeadline(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
+ long flowSla = DagManagerUtils.getFlowSLA(dagNode);
+ long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
+
+ // note that this condition should be true because the triggered dag
action has waited enough before reaching here
+ if (System.currentTimeMillis() > flowStartTime + flowSla) {
+ log.info("Dag {} exceeded the SLA of {} ms. Killing it now...",
getDagId(), flowSla);
+ List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel =
dag.get().getNodes();
+ log.info("Found {} DagNodes to cancel (DagId {}).",
dagNodesToCancel.size(), getDagId());
Review Comment:
can we consolidate these logs into one so easier to track. Let's also make
the message something easily searchable
`Dag exceeded Flow Finish SLA. Killing all jobs associated with it. Dag: {}
SLA {} Num nodes to kill {} DagID: {}` then the info is all in one log that's
easier to search.
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+ private ITestMetastoreDatabase testMetastoreDatabase;
+ private final MockedStatic<GobblinServiceManager>
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testMetastoreDatabase.close();
+ this.mockedGobblinServiceManager.close();
+ }
+
+ /*
+ This test simulate submitting a dag with a very little job start deadline
and then verifies that the starting job is
+ killed because not being able to start in the deadline time.
+ */
+ @Test
+ public void enforceJobStartDeadlineTest() throws Exception {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long flowExecutionId = System.currentTimeMillis();
+ MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+ doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 5, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME,
ConfigValueFactory.fromAnyRef(1L)));
+ JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+ message("Test
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+ doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+ this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
+ dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
has not yet started running
+
+ EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new
EnforceJobStartDeadlineDagProc(
+ new EnforceJobStartDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+ "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null, mock(DagActionStore.class)));
+ enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
+
+ int expectedNumOfDeleteDagNodeStates = 1; // the one dag node
corresponding to the EnforceStartDeadlineDagProc
+ Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
+
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count());
+ }
+
+ /*
+ This test simulate submitting a dag with a very little flow finish deadline
and then verifies that all of its job are
Review Comment:
see above
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -124,9 +131,18 @@ public DagTask next() {
while (true) {
try {
DagActionStore.DagAction dagAction = this.dagActionQueue.take();
- LeaseAttemptStatus leaseAttemptStatus =
retrieveLeaseStatus(dagAction);
- if (leaseAttemptStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus) {
- return createDagTask(dagAction,
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
+ // create triggers for original (non-reminder) dag actions of type
ENFORCE_JOB_START_DEADLINE and ENFORCE_FLOW_FINISH_DEADLINE
+ // reminder triggers are used to inform hosts once the deadline for
ENFORCE_JOB_START and ENFORCE_FLOW_FINISH passed
Review Comment:
missing "_DEADLINE" also nit: let's use a multi-line comment /*....*/
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -86,18 +88,18 @@ public static class ReminderJob implements Job {
@Override
public void execute(JobExecutionContext context) {
// Get properties from the trigger to create a dagAction
- JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
+ JobDataMap jobDataMap = context.getMergedJobDataMap();
String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
String flowGroup =
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
- String flowId =
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ String flowExecutionId =
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
DagActionStore.DagActionType dagActionType =
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ",
flowName: " + flowName
- + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
+ + ", flowExecutionId: " + flowExecutionId + ", jobName: " + jobName
+ ", dagActionType: " + dagActionType + ")");
- DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
- dagActionType);
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
dagActionType);
+ //dagAction.setReminder(true);
Review Comment:
actually we have a better method:
pass a flag to `createReminderJobTrigger` when the reminder is created to
denote whether it's actually a reminder or not so for deadline triggers the
first time we are created we don't set this value then for any subsequent
attempt we set reminder=True. when we create reminder for all other actions
types we set reminder=True
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -86,18 +88,18 @@ public static class ReminderJob implements Job {
@Override
public void execute(JobExecutionContext context) {
// Get properties from the trigger to create a dagAction
- JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
+ JobDataMap jobDataMap = context.getMergedJobDataMap();
String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
String flowGroup =
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
- String flowId =
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ String flowExecutionId =
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
DagActionStore.DagActionType dagActionType =
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ",
flowName: " + flowName
- + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
+ + ", flowExecutionId: " + flowExecutionId + ", jobName: " + jobName
+ ", dagActionType: " + dagActionType + ")");
- DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
- dagActionType);
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
dagActionType);
+ //dagAction.setReminder(true);
Review Comment:
do we not always want to set the reminder flag for actions coming from the
trigger (potentially other than the deadline actions depending on how we decide
to implement)
option 1: set reminder flag for all events
- handle deadline events knowing when it's a reminder = True is when we
enforce
option 2: set reminder flag for all events except deadline if there's some
other job property such as "firstTimeFiring=True"
- set firstTimeFiring=False so all subsequent deadline actions are known to
be true "reminders"
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation for {@link DagProc} that kills all the jobs if the dag
does not finish in
+ * {@link
org.apache.gobblin.configuration.ConfigurationKeys#GOBBLIN_FLOW_SLA_TIME} time.
+ */
+@Slf4j
+public class EnforceFlowFinishDeadlineDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+ public EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask
enforceFlowFinishDeadlineDagTask) {
+ super(enforceFlowFinishDeadlineDagTask);
+ }
+
+ @Override
+ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
+ throws IOException {
+ return dagManagementStateStore.getDag(getDagId());
+ }
+
+ @Override
+ protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ log.info("Request to enforce deadlines for dag {}", getDagId());
+
+ if (!dag.isPresent()) {
+ // todo - add a metric here
+ log.error("Did not find Dag with id {}, it might be already
cancelled/finished and thus cleaned up from the store.",
+ getDagId());
+ return;
+ }
+
+ enforceFlowFinishDeadline(dagManagementStateStore, dag);
+ }
+
+ private void enforceFlowFinishDeadline(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
+ long flowSla = DagManagerUtils.getFlowSLA(dagNode);
+ long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
+
+ // note that this condition should be true because the triggered dag
action has waited enough before reaching here
+ if (System.currentTimeMillis() > flowStartTime + flowSla) {
Review Comment:
let's add a warn/error log and/or metric (can be TODO) for when this is not
true
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java:
##########
@@ -191,4 +196,16 @@ private void submitNextNodes(DagManagementStateStore
dagManagementStateStore, Da
private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>>
nextNodes) {
throw new UnsupportedOperationException("More than one start job is not
allowed");
}
+
+ private void removeFlowFinishDeadlineTriggerAndDagAction() {
+ DagActionStore.DagAction enforceFlowFinishDeadlineDagAction =
DagActionStore.DagAction.forFlow(getDagNodeId().getFlowGroup(),
Review Comment:
let's log when this function is used in case it has unintended consequences
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * An implementation for {@link DagProc} that marks the {@link Dag} as failed
and cancel the job if it does not start in
+ * {@link
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME}
time.
+ */
+@Slf4j
+public class EnforceJobStartDeadlineDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+ public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask
enforceJobStartDeadlineDagTask) {
+ super(enforceJobStartDeadlineDagTask);
+ }
+
+ @Override
+ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
+ throws IOException {
+ return dagManagementStateStore.getDag(getDagId());
+ }
+
+ @Override
+ protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ log.info("Request to enforce deadlines for dag {}", getDagId());
+
+ if (!dag.isPresent()) {
+ // todo - add a metric here
+ log.error("Did not find Dag with id {}, it might be already
cancelled/finished and thus cleaned up from the store.",
+ getDagId());
+ return;
+ }
+
+ enforceJobStartDeadline(dagManagementStateStore, dag);
+ }
+
+ private void enforceJobStartDeadline(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
+ dagNodeToCheckDeadline =
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
+ if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
+ // this should never happen; a job for which DEADLINE_ENFORCEMENT dag
action is created must have a dag node in store
+ log.error("Dag node {} not found for EnforceJobStartDeadlineDagProc",
getDagNodeId());
+ return;
+ }
+
+ Dag.DagNode<JobExecutionPlan> dagNode =
dagNodeToCheckDeadline.getLeft().get();
+ long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode,
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+ Optional<org.apache.gobblin.service.monitoring.JobStatus> jobStatus =
dagNodeToCheckDeadline.getRight();
+ if (!jobStatus.isPresent()) {
+ return;
+ }
+
+ ExecutionStatus executionStatus = valueOf(jobStatus.get().getEventName());
+ long jobOrchestratedTime = jobStatus.get().getOrchestratedTime();
+ // note that second condition should be true because the triggered dag
action has waited enough before reaching here
+ if (executionStatus == ORCHESTRATED && System.currentTimeMillis() >
jobOrchestratedTime + timeOutForJobStart) {
+ log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing
the job now...",
Review Comment:
I suggest a similar change in log message to put a cohesive message
beforehand that's easily searchable then all the parameters at the end
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * An implementation for {@link DagProc} that marks the {@link Dag} as failed
and cancel the job if it does not start in
+ * {@link
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME}
time.
+ */
+@Slf4j
+public class EnforceJobStartDeadlineDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+ public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask
enforceJobStartDeadlineDagTask) {
+ super(enforceJobStartDeadlineDagTask);
+ }
+
+ @Override
+ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
+ throws IOException {
+ return dagManagementStateStore.getDag(getDagId());
+ }
+
+ @Override
+ protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ log.info("Request to enforce deadlines for dag {}", getDagId());
+
+ if (!dag.isPresent()) {
+ // todo - add a metric here
+ log.error("Did not find Dag with id {}, it might be already
cancelled/finished and thus cleaned up from the store.",
+ getDagId());
+ return;
+ }
+
+ enforceJobStartDeadline(dagManagementStateStore, dag);
+ }
+
+ private void enforceJobStartDeadline(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
+ dagNodeToCheckDeadline =
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
+ if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
+ // this should never happen; a job for which DEADLINE_ENFORCEMENT dag
action is created must have a dag node in store
Review Comment:
add TODO to include a metric here that we will alert on
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -246,6 +256,19 @@ protected void
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
}
}
+ private void clearStartDeadlineTriggerAndDagAction(String flowGroup, String
flowName, String flowExecutionId, String jobName) {
+
Review Comment:
same as above let's log when this is used and javadoc for when it should be
called
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
+
+
+/**
+ * A {@link DagTask} responsible for killing jobs if they have not started in
+ * {@link
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME}
or does not finish in
Review Comment:
is this not only for the latter?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
+
+
+/**
+ * A {@link DagTask} responsible for killing jobs if they have not started in
+ * {@link
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME}
or does not finish in
Review Comment:
only the former right
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+ private ITestMetastoreDatabase testMetastoreDatabase;
+ private final MockedStatic<GobblinServiceManager>
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testMetastoreDatabase.close();
+ this.mockedGobblinServiceManager.close();
+ }
+
+ /*
+ This test simulate submitting a dag with a very little job start deadline
and then verifies that the starting job is
+ killed because not being able to start in the deadline time.
+ */
+ @Test
+ public void enforceJobStartDeadlineTest() throws Exception {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long flowExecutionId = System.currentTimeMillis();
+ MostlyMySqlDagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+ doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 5, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME,
ConfigValueFactory.fromAnyRef(1L)));
+ JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+ message("Test
message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+ doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+ this.mockedGobblinServiceManager.when(() ->
GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
+ dagManagementStateStore.checkpointDag(dag); // simulate having a dag that
has not yet started running
+
+ EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new
EnforceJobStartDeadlineDagProc(
+ new EnforceJobStartDeadlineDagTask(new
DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId),
+ "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE),
null, mock(DagActionStore.class)));
+ enforceJobStartDeadlineDagProc.process(dagManagementStateStore);
+
+ int expectedNumOfDeleteDagNodeStates = 1; // the one dag node
corresponding to the EnforceStartDeadlineDagProc
+ Assert.assertEquals(expectedNumOfDeleteDagNodeStates,
+
Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count());
Review Comment:
do both deadlines result in same method being called?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * An implementation for {@link DagProc} that marks the {@link Dag} as failed
and cancel the job if it does not start in
+ * {@link
org.apache.gobblin.service.modules.orchestration.DagManager#JOB_START_SLA_TIME}
time.
+ */
+@Slf4j
+public class EnforceJobStartDeadlineDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
+
+ public EnforceJobStartDeadlineDagProc(EnforceJobStartDeadlineDagTask
enforceJobStartDeadlineDagTask) {
+ super(enforceJobStartDeadlineDagTask);
+ }
+
+ @Override
+ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
+ throws IOException {
+ return dagManagementStateStore.getDag(getDagId());
+ }
+
+ @Override
+ protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ log.info("Request to enforce deadlines for dag {}", getDagId());
+
+ if (!dag.isPresent()) {
+ // todo - add a metric here
+ log.error("Did not find Dag with id {}, it might be already
cancelled/finished and thus cleaned up from the store.",
+ getDagId());
+ return;
+ }
+
+ enforceJobStartDeadline(dagManagementStateStore, dag);
+ }
+
+ private void enforceJobStartDeadline(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ Pair<Optional<Dag.DagNode<JobExecutionPlan>>,
Optional<org.apache.gobblin.service.monitoring.JobStatus>>
+ dagNodeToCheckDeadline =
dagManagementStateStore.getDagNodeWithJobStatus(getDagNodeId());
+ if (!dagNodeToCheckDeadline.getLeft().isPresent()) {
+ // this should never happen; a job for which DEADLINE_ENFORCEMENT dag
action is created must have a dag node in store
+ log.error("Dag node {} not found for EnforceJobStartDeadlineDagProc",
getDagNodeId());
+ return;
+ }
+
+ Dag.DagNode<JobExecutionPlan> dagNode =
dagNodeToCheckDeadline.getLeft().get();
+ long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode,
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+ Optional<org.apache.gobblin.service.monitoring.JobStatus> jobStatus =
dagNodeToCheckDeadline.getRight();
+ if (!jobStatus.isPresent()) {
Review Comment:
does this mean the job finished? if so let's comment
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceFlowFinishDeadlineDagTask;
+import
org.apache.gobblin.service.modules.orchestration.task.EnforceJobStartDeadlineDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class EnforceDeadlineDagProcsTest {
+ private ITestMetastoreDatabase testMetastoreDatabase;
+ private final MockedStatic<GobblinServiceManager>
mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ // `.close()` to avoid (in the aggregate, across multiple suites) -
java.sql.SQLNonTransientConnectionException: Too many connections
+ this.testMetastoreDatabase.close();
+ this.mockedGobblinServiceManager.close();
+ }
+
+ /*
+ This test simulate submitting a dag with a very little job start deadline
and then verifies that the starting job is
Review Comment:
"with very short job start deadline that will definitely be breached,
resulting in the job requiring to be killed..." (or something to that effect
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java:
##########
@@ -134,6 +146,12 @@ public void testOneNextJobToRun() throws Exception {
// current job's state is deleted
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count(), 1);
+
+
Assert.assertEquals(Mockito.mockingDetails(this.dagActionReminderScheduler).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1);
+
+
Assert.assertEquals(Mockito.mockingDetails(this.dagActionStore).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
Review Comment:
why are these called if the reminder fired and action was taken? I would
expect deleteDagAction because it means the action has been completed
(committed)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -246,6 +256,19 @@ protected void
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
}
}
+ private void clearStartDeadlineTriggerAndDagAction(String flowGroup, String
flowName, String flowExecutionId, String jobName) {
+
Review Comment:
also put todo for a metric
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]