This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ceca1e5de [GOBBLIN-2022] add re-evaluate dag proc (#3896)
ceca1e5de is described below
commit ceca1e5ded294711ec44ca183fe4675161beb963
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Apr 4 10:28:29 2024 -0700
[GOBBLIN-2022] add re-evaluate dag proc (#3896)
* add re-evaluate dag proc
address review comments
fix test
merge conflicts
---
.../src/main/avro/DagActionStoreChangeEvent.avsc | 6 +-
.../runtime/KafkaAvroJobStatusMonitorTest.java | 7 +-
.../modules/orchestration/DagActionStore.java | 2 +-
.../orchestration/DagManagementStateStore.java | 7 +-
.../orchestration/DagManagementTaskStreamImpl.java | 3 +
.../modules/orchestration/DagProcFactory.java | 7 +
.../modules/orchestration/DagTaskVisitor.java | 2 +
.../MostlyMySqlDagManagementStateStore.java | 10 +-
.../modules/orchestration/proc/DagProc.java | 4 +
.../modules/orchestration/proc/LaunchDagProc.java | 8 +-
.../orchestration/proc/ReevaluateDagProc.java | 194 +++++++++++++++++++++
.../modules/orchestration/task/DagTask.java | 2 +-
.../ReevaluateDagTask.java} | 21 ++-
.../DagManagementDagActionStoreChangeMonitor.java | 3 +-
.../monitoring/KafkaAvroJobStatusMonitor.java | 7 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 59 +++++--
.../monitoring/KafkaJobStatusMonitorFactory.java | 16 +-
.../gobblin/service/GobblinServiceManagerTest.java | 4 +-
.../DagManagementTaskStreamImplTest.java | 6 +
.../orchestration/DagProcessingEngineTest.java | 28 ++-
.../MostlyMySqlDagManagementStateStoreTest.java | 2 +-
.../MysqlMultiActiveLeaseArbiterTest.java | 9 +-
.../orchestration/MysqlUserQuotaManagerTest.java | 10 +-
.../orchestration/proc/LaunchDagProcTest.java | 9 +
.../orchestration/proc/ReevaluateDagProcTest.java | 177 +++++++++++++++++++
.../service/monitoring/JobStatusRetrieverTest.java | 8 +-
.../monitoring/MysqlJobStatusRetrieverTest.java | 40 ++++-
27 files changed, 581 insertions(+), 70 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
index 60eeb154a..c6d94a628 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
@@ -36,12 +36,14 @@
"symbols": [
"KILL",
"RESUME",
- "LAUNCH"
+ "LAUNCH",
+ "REEVALUATE"
],
"symbolDocs": {
"KILL": "Kill the flow corresponding to this dag",
"RESUME": "Resume or start a new flow corresponding to this dag",
- "LAUNCH": "Launch a new execution of the flow corresponding to this
dag"
+ "LAUNCH": "Launch a new execution of the flow corresponding to this
dag",
+ "REEVALUATE": "Re-evaluate what needs to be done upon receipt of a
final job status"
}
},
"doc" : "type of dag action",
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index d9102a1be..94980017c 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -71,6 +71,7 @@ import
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueReposi
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.service.monitoring.GaaSObservabilityEventProducer;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
@@ -97,6 +98,7 @@ public class KafkaAvroJobStatusMonitorTest {
private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
private MetricContext context;
private KafkaAvroEventKeyValueReporter.Builder<?> builder;
+ private MysqlDagActionStore mysqlDagActionStore;
@BeforeClass
public void setUp() throws Exception {
@@ -115,6 +117,7 @@ public class KafkaAvroJobStatusMonitorTest {
builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context);
builder =
builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
+ this.mysqlDagActionStore = mock(MysqlDagActionStore.class);
}
@Test
@@ -775,7 +778,7 @@ public class KafkaAvroJobStatusMonitorTest {
public MockKafkaAvroJobStatusMonitor(String topic, Config config, int
numThreads,
AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle,
GaaSObservabilityEventProducer producer)
throws IOException, ReflectiveOperationException {
- super(topic, config, numThreads, mock(JobIssueEventHandler.class),
producer);
+ super(topic, config, numThreads, mock(JobIssueEventHandler.class),
producer, mysqlDagActionStore);
shouldThrowFakeExceptionInParseJobStatus =
shouldThrowFakeExceptionInParseJobStatusToggle;
}
@@ -791,7 +794,7 @@ public class KafkaAvroJobStatusMonitorTest {
/**
* Overridden to stub potential exception within core processing of
`processMessage` (specifically retried portion).
- * Although truly plausible (IO)Exceptions would originate from
`KafkaJobStatusMonitor.addJobStatusToStateStore`,
+ * Although truly plausible (IO)Exceptions would originate from
`KafkaJobStatusMonitor.updateJobStatus`,
* that is `static`, so unavailable for override. The approach here is a
pragmatic compromise, being simpler than
* the alternative of writing a mock `StateStore.Factory` that the
`KafkaJobStatusMonitor` ctor could reflect,
* instantiate, and finally create a mock `StateStore` from.
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index 94beebd16..eafb05750 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -35,7 +35,7 @@ public interface DagActionStore {
LAUNCH, // Launch new flow execution invoked adhoc or through scheduled
trigger
RETRY, // Invoked through DagManager for flows configured to allow retries
CANCEL, // Invoked through DagManager if flow has been stuck in
Orchestrated state for a while
- ADVANCE // Launch next step in multi-hop dag
+ REEVALUATE // Re-evaluate what needs to be done upon receipt of a final
job status
}
@Data
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index 274abeaa0..b56459da7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -132,11 +132,12 @@ public interface DagManagementStateStore {
void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId
dagId) throws IOException;
/**
- * Returns the requested {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link
JobStatus},
- * or Optional.absent if it is not found.
+ * Returns the requested {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} and its {@link
JobStatus}.
+ * Both params are returned as optional and are empty if not present in the
store.
+ * JobStatus can be non-empty only if DagNode is non-empty.
* @param dagNodeId of the dag node
*/
- Optional<Pair<Dag.DagNode<JobExecutionPlan>, JobStatus>>
getDagNodeWithJobStatus(DagNodeId dagNodeId);
+ Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
getDagNodeWithJobStatus(DagNodeId dagNodeId);
/**
* Returns a list of {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} for a {@link Dag}.
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 4409a673f..8ad246aa2 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -41,6 +41,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
import org.apache.gobblin.util.ConfigUtils;
@@ -161,6 +162,8 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
switch (dagActionType) {
case LAUNCH:
return new LaunchDagTask(dagAction, leaseObtainedStatus,
dagActionStore.get());
+ case REEVALUATE:
+ return new ReevaluateDagTask(dagAction, leaseObtainedStatus,
dagActionStore.get());
default:
throw new UnsupportedOperationException(dagActionType + " not yet
implemented");
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
index 24e65eb0b..ef7aea3d2 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
@@ -23,8 +23,10 @@ import com.google.inject.Singleton;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.ReevaluateDagProc;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
@@ -49,6 +51,11 @@ public class DagProcFactory implements
DagTaskVisitor<DagProc> {
public LaunchDagProc meet(LaunchDagTask launchDagTask) {
return new LaunchDagProc(launchDagTask,
this.flowCompilationValidationHelper);
}
+
+ @Override
+ public ReevaluateDagProc meet(ReevaluateDagTask reEvaluateDagTask) {
+ return new ReevaluateDagProc(reEvaluateDagTask);
+ }
//todo - overload meet method for other dag tasks
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
index 0b27b4e9d..e946f9835 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
@@ -18,8 +18,10 @@
package org.apache.gobblin.service.modules.orchestration;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
public interface DagTaskVisitor<T> {
T meet(LaunchDagTask launchDagTask);
+ T meet(ReevaluateDagTask reevaluateDagTask);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index 5304c8003..b010a0cb5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -211,12 +211,12 @@ public class MostlyMySqlDagManagementStateStore
implements DagManagementStateSto
return this.dagStateStore.existsDag(dagId);
}
- public Optional<Pair<Dag.DagNode<JobExecutionPlan>, JobStatus>>
getDagNodeWithJobStatus(DagNodeId dagNodeId) {
- Optional<JobStatus> jobStatus = getJobStatus(dagNodeId);
- if (this.dagNodes.containsKey(dagNodeId) && jobStatus.isPresent()) {
- return Optional.of(ImmutablePair.of(this.dagNodes.get(dagNodeId),
jobStatus.get()));
+ public Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
getDagNodeWithJobStatus(DagNodeId dagNodeId) {
+ if (this.dagNodes.containsKey(dagNodeId)) {
+ return ImmutablePair.of(Optional.of(this.dagNodes.get(dagNodeId)),
getJobStatus(dagNodeId));
} else {
- return Optional.empty();
+ // no point of searching for status if the node itself is absent.
+ return ImmutablePair.of(Optional.empty(), Optional.empty());
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
index 4f0a254b5..fece28d8c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
@@ -40,6 +40,10 @@ import
org.apache.gobblin.service.modules.orchestration.task.DagTask;
* actions based on the type of {@link DagTask}. Submitting events in time is
found to be important in
* <a href="https://github.com/apache/gobblin/pull/3641">PR#3641</a>, hence
initialize and act methods submit events as
* they happen.
+ * Parameter T is the type of object that needs an initialisation before the
dag proc does the main work in `act` method.
+ * This object is then passed to `act` method.
+ *
+ * @param <T> type of the initialization "state" on which to {@link
DagProc#act}
*/
@Alpha
@Data
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index 4cef3b7a7..482cb563d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -77,6 +77,8 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
// todo - add metrics
} else {
submitNextNodes(dagManagementStateStore, dag.get());
+ //Checkpoint the dag state, it should have an updated value of dag nodes
+ dagManagementStateStore.checkpointDag(dag.get());
orchestrationDelayCounter.set(System.currentTimeMillis() -
DagManagerUtils.getFlowExecId(dag.get()));
}
}
@@ -84,8 +86,7 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
/**
* Submit next set of Dag nodes in the provided Dag.
*/
- private void submitNextNodes(DagManagementStateStore
dagManagementStateStore,
- Dag<JobExecutionPlan> dag) throws IOException {
+ private void submitNextNodes(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag) {
Set<Dag.DagNode<JobExecutionPlan>> nextNodes =
DagManagerUtils.getNext(dag);
if (nextNodes.size() > 1) {
@@ -97,9 +98,6 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode,
getDagId());
log.info("Submitted job {} for dagId {}",
DagManagerUtils.getJobName(dagNode), getDagId());
}
-
- //Checkpoint the dag state, it should have an updated value of dag nodes
- dagManagementStateStore.checkpointDag(dag);
}
private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>>
nextNodes) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
new file mode 100644
index 000000000..6189a9b6e
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * suggest:
+ * A {@link DagProc} to launch any subsequent (dependent) job(s) once all
pre-requisite job(s) in the Dag have succeeded.
+ * When there are no more jobs to run and no more running, it cleans up the
Dag.
+ * (In future), if there are multiple new jobs to be launched, separate launch
dag actions are created for each of them.
+ */
+@Slf4j
+public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>> {
+
+ public ReevaluateDagProc(ReevaluateDagTask reEvaluateDagTask) {
+ super(reEvaluateDagTask);
+ }
+
+ @Override
+ protected Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
initialize(DagManagementStateStore dagManagementStateStore)
+ throws IOException {
+ Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
dagNodeWithJobStatus =
+ dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId);
+
+ if (!dagNodeWithJobStatus.getLeft().isPresent() ||
!dagNodeWithJobStatus.getRight().isPresent()) {
+ // this is possible when MALA malfunctions and a duplicated reevaluate
dag proc is launched for a dag node that is
+ // already "reevaluated" and cleaned up.
+ return ImmutablePair.of(Optional.empty(), Optional.empty());
+ }
+
+ ExecutionStatus executionStatus =
ExecutionStatus.valueOf(dagNodeWithJobStatus.getRight().get().getEventName());
+ if
(!FlowStatusGenerator.FINISHED_STATUSES.contains(executionStatus.name())) {
+ log.warn("Job status for dagNode {} is {}. Re-evaluate dag action should
have been created only for finished status - {}",
+ dagNodeId, executionStatus, FlowStatusGenerator.FINISHED_STATUSES);
+ // this may happen if adding job status in the store failed after adding
a ReevaluateDagAction in KafkaJobStatusMonitor
+ throw new RuntimeException(String.format("Job status %s is not final for
job %s", executionStatus, getDagId()));
+ }
+
+ setStatus(dagManagementStateStore, dagNodeWithJobStatus.getLeft().get(),
executionStatus);
+ return dagNodeWithJobStatus;
+ }
+
+ @Override
+ protected void act(DagManagementStateStore dagManagementStateStore,
Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>
dagNodeWithJobStatus)
+ throws IOException {
+ if (!dagNodeWithJobStatus.getLeft().isPresent()) {
+ // one of the reason this could arise is when the MALA leasing doesn't
work cleanly and another DagProc::process
+ // has cleaned up the Dag, yet did not complete the lease before this
current one acquired its own
+ log.error("DagNode or its job status not found for a Reevaluate
DagAction with dag node id {}", this.dagNodeId);
+ // todo - add metrics to count such occurrences
+ return;
+ }
+
+ Dag.DagNode<JobExecutionPlan> dagNode =
dagNodeWithJobStatus.getLeft().get();
+ JobStatus jobStatus = dagNodeWithJobStatus.getRight().get();
+ ExecutionStatus executionStatus = dagNode.getValue().getExecutionStatus();
+ Dag<JobExecutionPlan> dag =
dagManagementStateStore.getDag(getDagId()).get();
+ onJobFinish(dagManagementStateStore, dagNode, executionStatus, dag);
+
+ if (jobStatus.isShouldRetry()) {
+ log.info("Retrying job: {}, current attempts: {}, max attempts: {}",
+ DagManagerUtils.getFullyQualifiedJobName(dagNode),
jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
+ // todo - be careful when unsetting this, it is possible that this is
set to FAILED because some other job in the
+ // dag failed and is also not retryable. in that case if this job's
retry passes, overall status of the dag can be
+ // set to PASS, which would be incorrect.
+ dag.setFlowEvent(null);
+ DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode,
getDagId());
+ } else if (!dagManagementStateStore.hasRunningJobs(getDagId())) {
+ if (dag.getFlowEvent() == null) {
+ // If the dag flow event is not set and there are no more jobs
running, then it is successful
+ // also note that `onJobFinish` method does whatever is required to do
after job finish, determining a Dag's
+ // status is not possible on individual job's finish status
+ dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_SUCCEEDED);
+ }
+ String flowEvent = dag.getFlowEvent();
+ DagManagerUtils.emitFlowEvent(eventSubmitter, dag, flowEvent);
+ if (flowEvent.equals(TimingEvent.FlowTimings.FLOW_SUCCEEDED)) {
+ // todo - verify if work from PR#3641 is required
+ dagManagementStateStore.deleteDag(getDagId());
+ } else {
+ dagManagementStateStore.markDagFailed(dag);
+ }
+ }
+ }
+
+ /**
+ * Sets status of a dag node inside the given Dag.
+ * todo - DMSS should support this functionality like an atomic get-and-set
operation.
+ */
+ private void setStatus(DagManagementStateStore dagManagementStateStore,
+ Dag.DagNode<JobExecutionPlan> dagNode, ExecutionStatus executionStatus)
throws IOException {
+ Dag<JobExecutionPlan> dag =
dagManagementStateStore.getDag(getDagId()).get();
+ DagNodeId dagNodeId = dagNode.getValue().getId();
+ for (Dag.DagNode<JobExecutionPlan> node : dag.getNodes()) {
+ if (node.getValue().getId().equals(dagNodeId)) {
+ node.getValue().setExecutionStatus(executionStatus);
+ dagManagementStateStore.checkpointDag(dag);
+ return;
+ }
+ }
+ log.error("DagNode with id {} not found in Dag {}", dagNodeId, getDagId());
+ }
+
+ /**
+ * Method that defines the actions to be performed when a job finishes
either successfully or with failure.
+ * This method updates the state of the dag and performs clean up actions as
necessary.
+ */
+ private void onJobFinish(DagManagementStateStore dagManagementStateStore,
Dag.DagNode<JobExecutionPlan> dagNode,
+ ExecutionStatus executionStatus, Dag<JobExecutionPlan> dag) throws
IOException {
+ String jobName = DagManagerUtils.getFullyQualifiedJobName(dagNode);
+ log.info("Job {} of Dag {} has finished with status {}", jobName,
getDagId(), executionStatus.name());
+ // Only decrement counters and quota for jobs that actually ran on the
executor, not from a GaaS side failure/skip event
+ if (dagManagementStateStore.releaseQuota(dagNode)) {
+
dagManagementStateStore.getDagManagerMetrics().decrementRunningJobMetrics(dagNode);
+ }
+
+ switch (executionStatus) {
+ case FAILED:
+ dag.setMessage("Flow failed because job " + jobName + " failed");
+ dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_FAILED);
+
dagManagementStateStore.getDagManagerMetrics().incrementExecutorFailed(dagNode);
+ break;
+ case CANCELLED:
+ dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+ break;
+ case COMPLETE:
+
dagManagementStateStore.getDagManagerMetrics().incrementExecutorSuccess(dagNode);
+ submitNextNodes(dagManagementStateStore, dag);
+ break;
+ default:
+ log.warn("It should not reach here. Job status {} is unexpected.",
executionStatus);
+ }
+
+ //Checkpoint the dag state, it should have an updated value of dag fields
+ dagManagementStateStore.checkpointDag(dag);
+ dagManagementStateStore.deleteDagNodeState(getDagId(), dagNode);
+ }
+
+ /**
+ * Submit next set of Dag nodes in the Dag identified by the provided dagId
+ */
+ private void submitNextNodes(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag) {
+ Set<Dag.DagNode<JobExecutionPlan>> nextNodes =
DagManagerUtils.getNext(dag);
+
+ if (nextNodes.size() > 1) {
+ handleMultipleJobs(nextNodes);
+ }
+
+ if (!nextNodes.isEmpty()) {
+ Dag.DagNode<JobExecutionPlan> nextNode =
nextNodes.stream().findFirst().get();
+ DagProcUtils.submitJobToExecutor(dagManagementStateStore, nextNode,
getDagId());
+ log.info("Submitted job {} for dagId {}",
DagManagerUtils.getJobName(nextNode), getDagId());
+ }
+ }
+
+ private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>>
nextNodes) {
+ throw new UnsupportedOperationException("More than one start job is not
allowed");
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index bc12926c7..f031134d1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -57,7 +57,7 @@ public abstract class DagTask {
* work on this task, is done in this method.
* Returns true if concluding dag task finished successfully otherwise false.
*/
- public boolean conclude() {
+ public final boolean conclude() {
try {
this.dagActionStore.deleteDagAction(this.dagAction);
return this.leaseObtainedStatus.completeLease();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
similarity index 52%
copy from
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
copy to
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
index 0b27b4e9d..7a90c9912 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java
@@ -15,11 +15,24 @@
* limitations under the License.
*/
-package org.apache.gobblin.service.modules.orchestration;
+package org.apache.gobblin.service.modules.orchestration.task;
-import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
-public interface DagTaskVisitor<T> {
- T meet(LaunchDagTask launchDagTask);
+/**
+ * A {@link DagTask} responsible to handle re-evaluate dag actions.
+ */
+
+public class ReevaluateDagTask extends DagTask {
+ public ReevaluateDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
+ DagActionStore dagActionStore) {
+ super(dagAction, leaseObtainedStatus, dagActionStore);
+ }
+
+ public <T> T host(DagTaskVisitor<T> visitor) {
+ return visitor.meet(this);
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index e7843f1af..413fbcdc5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -61,10 +61,11 @@ public class DagManagementDagActionStoreChangeMonitor
extends DagActionStoreChan
// todo - add actions for other other type of dag actions
switch (dagAction.getDagActionType()) {
case LAUNCH :
+ case REEVALUATE :
dagManagement.addDagAction(dagAction);
break;
default:
- log.warn("Received unsupported dagAction {}. Expected to be a
LAUNCH", dagAction.getDagActionType());
+ log.warn("Received unsupported dagAction {}. Expected to be a
REEVALUATE or LAUNCH", dagAction.getDagActionType());
this.unexpectedErrors.mark();
}
} catch (IOException e) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index c152970cb..4aced6afb 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -46,6 +46,7 @@ import
org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.util.ConfigUtils;
@@ -64,10 +65,10 @@ public class KafkaAvroJobStatusMonitor extends
KafkaJobStatusMonitor {
@Getter
private Meter messageParseFailures;
- public KafkaAvroJobStatusMonitor(String topic, Config config, int numThreads,
- JobIssueEventHandler jobIssueEventHandler,
GaaSObservabilityEventProducer observabilityEventProducer)
+ public KafkaAvroJobStatusMonitor(String topic, Config config, int
numThreads, JobIssueEventHandler jobIssueEventHandler,
+ GaaSObservabilityEventProducer observabilityEventProducer,
DagActionStore dagActionStore)
throws IOException, ReflectiveOperationException {
- super(topic, config, numThreads, jobIssueEventHandler,
observabilityEventProducer);
+ super(topic, config, numThreads, jobIssueEventHandler,
observabilityEventProducer, dagActionStore);
if (ConfigUtils.getBoolean(config,
ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false)) {
KafkaAvroSchemaRegistry schemaRegistry = (KafkaAvroSchemaRegistry) new
KafkaAvroSchemaRegistryFactory().
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 8ae588231..ab6d62f93 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -27,6 +27,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.github.rholder.retry.Attempt;
@@ -60,6 +63,7 @@ import
org.apache.gobblin.runtime.troubleshooter.IssueEventBuilder;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.retry.RetryerFactory;
@@ -112,10 +116,11 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
private final JobIssueEventHandler jobIssueEventHandler;
private final Retryer<Void> persistJobStatusRetryer;
private final GaaSObservabilityEventProducer eventProducer;
-
+ private final DagActionStore dagActionStore;
+ private final boolean dagProcEngineEnabled;
public KafkaJobStatusMonitor(String topic, Config config, int numThreads,
JobIssueEventHandler jobIssueEventHandler,
- GaaSObservabilityEventProducer observabilityEventProducer)
+ GaaSObservabilityEventProducer observabilityEventProducer,
DagActionStore dagActionStore)
throws ReflectiveOperationException {
super(topic, config.withFallback(DEFAULTS), numThreads);
String stateStoreFactoryClass = ConfigUtils.getString(config,
ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY,
FileContextBasedFsStateStoreFactory.class.getName());
@@ -125,6 +130,8 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
this.jobIssueEventHandler = jobIssueEventHandler;
+ this.dagActionStore = dagActionStore;
+ this.dagProcEngineEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, false);
Config retryerOverridesConfig =
config.hasPath(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
? config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
@@ -191,10 +198,33 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
persistJobStatusRetryer.call(() -> {
// re-create `jobStatus` on each attempt, since mutated within
`addJobStatusToStateStore`
org.apache.gobblin.configuration.State jobStatus =
parseJobStatus(gobblinTrackingEvent);
- if (jobStatus != null) {
- try (Timer.Context context =
getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
- addJobStatusToStateStore(jobStatus, this.stateStore,
this.eventProducer);
+ if (jobStatus == null) {
+ return null;
+ }
+
+ try (Timer.Context context =
getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
+ Pair<org.apache.gobblin.configuration.State, Boolean>
updatedJobStatus = recalcJobStatus(jobStatus, this.stateStore);
+ jobStatus = updatedJobStatus.getLeft();
+
+ String flowName =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+ String flowGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+ String flowExecutionId =
jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+ String jobName =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+ String jobGroup =
jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+ String storeName = jobStatusStoreName(flowGroup, flowName);
+ String tableName = jobStatusTableName(flowExecutionId, jobGroup,
jobName);
+
+ if (updatedJobStatus.getRight()) {
+ this.eventProducer.emitObservabilityEvent(jobStatus);
+
+ if (this.dagProcEngineEnabled) {
+ // todo - retried/resumed jobs *may* not be handled here, we may
want to create their dag action elsewhere
+ this.dagActionStore.addJobDagAction(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
+ }
}
+
+ // update the state store after adding a dag action to guaranty
at-least-once adding of dag action
+ stateStore.put(storeName, tableName, jobStatus);
}
return null;
});
@@ -217,17 +247,15 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
/**
- * Persist job status to the underlying {@link StateStore}.
- * It fills missing fields in job status and also merge the fields with the
- * existing job status in the state store. Merging is required because we
- * do not want to lose the information sent by other GobblinTrackingEvents.
- * @param jobStatus
+ * It fills missing fields in job status and also merge the fields with the
existing job status in the state store.
+ * Merging is required because we do not want to lose the information sent
by other GobblinTrackingEvents.
+ * Returns a pair of current job status after update in this method and a
boolean suggesting if the new job status is
+ * now in final state and was not in final state before this method was
called.
* @throws IOException
*/
@VisibleForTesting
- static void addJobStatusToStateStore(org.apache.gobblin.configuration.State
jobStatus, StateStore stateStore,
- GaaSObservabilityEventProducer eventProducer)
- throws IOException {
+ static Pair<org.apache.gobblin.configuration.State, Boolean>
recalcJobStatus(org.apache.gobblin.configuration.State jobStatus,
+ StateStore<org.apache.gobblin.configuration.State> stateStore) throws
IOException {
try {
if (!jobStatus.contains(TimingEvent.FlowEventConstants.JOB_NAME_FIELD)) {
jobStatus.setProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
JobStatusRetriever.NA_KEY);
@@ -276,10 +304,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
modifyStateIfRetryRequired(jobStatus);
- stateStore.put(storeName, tableName, jobStatus);
- if (isNewStateTransitionToFinal(jobStatus, states)) {
- eventProducer.emitObservabilityEvent(jobStatus);
- }
+ return ImmutablePair.of(jobStatus,
isNewStateTransitionToFinal(jobStatus, states));
} catch (Exception e) {
log.warn("Meet exception when adding jobStatus to state store at "
+ e.getStackTrace()[0].getClassName() + "line number: " +
e.getStackTrace()[0].getLineNumber(), e);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index 0b80b4f2e..ecc759333 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -24,6 +24,7 @@ import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import javax.inject.Inject;
+import javax.inject.Named;
import javax.inject.Provider;
import lombok.extern.slf4j.Slf4j;
@@ -33,6 +34,8 @@ import
org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -49,24 +52,29 @@ public class KafkaJobStatusMonitorFactory implements
Provider<KafkaJobStatusMoni
private final JobIssueEventHandler jobIssueEventHandler;
private final MultiContextIssueRepository issueRepository;
private final boolean instrumentationEnabled;
+ private final DagActionStore dagActionStore;
+ private final boolean dagProcEngineEnabled;
@Inject
public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler
jobIssueEventHandler, MultiContextIssueRepository issueRepository,
- GobblinInstanceEnvironment env) {
- this(config, jobIssueEventHandler, issueRepository,
env.isInstrumentationEnabled());
+ GobblinInstanceEnvironment env, DagActionStore dagActionStore,
@Named(InjectionNames.DAG_PROC_ENGINE_ENABLED) boolean dagProcEngineEnabled) {
+ this(config, jobIssueEventHandler, issueRepository,
env.isInstrumentationEnabled(), dagActionStore, dagProcEngineEnabled);
}
public KafkaJobStatusMonitorFactory(Config config, JobIssueEventHandler
jobIssueEventHandler, MultiContextIssueRepository issueRepository,
- boolean instrumentationEnabled) {
+ boolean instrumentationEnabled, DagActionStore dagActionStore, boolean
dagProcEngineEnabled) {
this.config = Objects.requireNonNull(config);
this.jobIssueEventHandler = Objects.requireNonNull(jobIssueEventHandler);
this.issueRepository = issueRepository;
this.instrumentationEnabled = instrumentationEnabled;
+ this.dagActionStore = dagActionStore;
+ this.dagProcEngineEnabled = dagProcEngineEnabled;
}
private KafkaJobStatusMonitor createJobStatusMonitor()
throws ReflectiveOperationException {
Config jobStatusConfig =
config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX);
+ jobStatusConfig =
jobStatusConfig.withValue(InjectionNames.DAG_PROC_ENGINE_ENABLED,
ConfigValueFactory.fromAnyRef(this.dagProcEngineEnabled));
String topic =
jobStatusConfig.getString(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_TOPIC_KEY);
int numThreads = ConfigUtils.getInt(jobStatusConfig,
KafkaJobStatusMonitor.JOB_STATUS_MONITOR_NUM_THREADS_KEY, 5);
@@ -95,7 +103,7 @@ public class KafkaJobStatusMonitorFactory implements
Provider<KafkaJobStatusMoni
observabilityEventProducerClassName,
ConfigUtils.configToState(config), this.issueRepository,
this.instrumentationEnabled);
return (KafkaJobStatusMonitor) GobblinConstructorUtils
- .invokeLongestConstructor(jobStatusMonitorClass, topic,
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer);
+ .invokeLongestConstructor(jobStatusMonitorClass, topic,
jobStatusConfig, numThreads, jobIssueEventHandler, observabilityEventProducer,
dagActionStore);
}
@Override
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 3dcc106f5..3bae27069 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -124,6 +124,7 @@ public class GobblinServiceManagerTest {
private FlowConfigV2Client flowConfigClient;
private MySQLContainer mysql;
+ ITestMetastoreDatabase testMetastoreDatabase;
private Git gitForPush;
private TestingServer testingServer;
@@ -145,7 +146,7 @@ public class GobblinServiceManagerTest {
serviceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_USERNAME,
mysql.getUsername());
serviceCoreProperties.put(ServiceConfigKeys.SERVICE_DB_PASSWORD,
mysql.getPassword());
- ITestMetastoreDatabase testMetastoreDatabase =
TestMetastoreDatabaseFactory.get();
+ testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
testingServer = new TestingServer(true);
@@ -267,6 +268,7 @@ public class GobblinServiceManagerTest {
}
mysql.stop();
+ testMetastoreDatabase.close();
}
/**
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 2e677f59d..ff75d906a 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Optional;
import org.junit.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -110,4 +111,9 @@ public class DagManagementTaskStreamImplTest {
DagProc dagProc = dagTask.host(this.dagProcFactory);
Assert.assertNotNull(dagProc);
}
+
+ @AfterClass
+ public void tearDown() throws IOException {
+ testMetastoreDatabase.close();
+ }
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index f5ed96867..077c5dd90 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.modules.orchestration;
+import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@@ -26,6 +27,7 @@ import java.util.concurrent.TimeoutException;
import org.mockito.Mockito;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -43,6 +45,8 @@ import
org.apache.gobblin.service.modules.orchestration.proc.DagProc;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.testing.AssertWithBackoff;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -55,11 +59,18 @@ public class DagProcessingEngineTest {
private DagTaskStream dagTaskStream;
private DagProcFactory dagProcFactory;
private MostlyMySqlDagManagementStateStore dagManagementStateStore;
+ static ITestMetastoreDatabase testMetastoreDatabase;
+ static DagActionStore dagActionStore;
+ static LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
@BeforeClass
public void setUp() throws Exception {
// Setting up mock DB
- ITestMetastoreDatabase testMetastoreDatabase =
TestMetastoreDatabaseFactory.get();
+ testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ dagActionStore = mock(DagActionStore.class);
+ doReturn(true).when(dagActionStore).deleteDagAction(any());
+ leaseObtainedStatus = mock(LeaseAttemptStatus.LeaseObtainedStatus.class);
+ doReturn(true).when(leaseObtainedStatus).completeLease();
Config config;
ConfigBuilder configBuilder = ConfigBuilder.create();
@@ -120,7 +131,7 @@ public class DagProcessingEngineTest {
private final boolean isBad;
public MockedDagTask(DagActionStore.DagAction dagAction, boolean isBad) {
- super(dagAction, null, null);
+ super(dagAction, leaseObtainedStatus, dagActionStore);
this.isBad = isBad;
}
@@ -128,11 +139,6 @@ public class DagProcessingEngineTest {
public <T> T host(DagTaskVisitor<T> visitor) {
return (T) new MockedDagProc(this, this.isBad);
}
-
- @Override
- public boolean conclude() {
- return false;
- }
}
static class MockedDagProc extends DagProc<Void> {
@@ -163,7 +169,8 @@ public class DagProcessingEngineTest {
// This tests verifies that all the dag tasks entered to the dag task stream
are retrieved by dag proc engine threads
@Test
- public void dagProcessingTest() throws InterruptedException,
TimeoutException {
+ public void dagProcessingTest()
+ throws InterruptedException, TimeoutException, IOException {
// there are MAX_NUM_OF_TASKS dag tasks returned and then each thread
additionally call (infinitely) once to wait
// in this unit tests, it does not infinitely wait though, because the
mocked task stream throws an exception on
// (MAX_NUM_OF_TASKS + 1) th call
@@ -176,4 +183,9 @@ public class DagProcessingEngineTest {
Assert.assertEquals(this.dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(),
expectedExceptions);
}
+
+ @AfterClass
+ public void tearDown() throws IOException {
+ testMetastoreDatabase.close();
+ }
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index ceb1b7b9f..649a6e80a 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -87,7 +87,7 @@ public class MostlyMySqlDagManagementStateStoreTest {
Assert.assertTrue(this.dagManagementStateStore.containsDag(dagId));
Assert.assertEquals(dag.toString(),
this.dagManagementStateStore.getDag(dagId).get().toString());
- Assert.assertEquals(dagNode,
this.dagManagementStateStore.getDagNodeWithJobStatus(dagNodeId).get().getLeft());
+ Assert.assertEquals(dagNode,
this.dagManagementStateStore.getDagNodeWithJobStatus(dagNodeId).getLeft().get());
Assert.assertEquals(dag.toString(),
this.dagManagementStateStore.getParentDag(dagNode).get().toString());
List<Dag.DagNode<JobExecutionPlan>> dagNodes =
this.dagManagementStateStore.getDagNodes(dagId);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 93003a99a..603db5e19 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -23,6 +23,7 @@ import java.sql.Timestamp;
import java.util.Optional;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -65,12 +66,13 @@ public class MysqlMultiActiveLeaseArbiterTest {
String.format(MysqlMultiActiveLeaseArbiter.CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
TABLE);
private String formattedAcquireLeaseIfFinishedStatement =
String.format(MysqlMultiActiveLeaseArbiter.CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
TABLE);
+ ITestMetastoreDatabase testDb;
// The setup functionality verifies that the initialization of the tables is
done correctly and verifies any SQL
// syntax errors.
@BeforeClass
public void setUp() throws Exception {
- ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+ this.testDb = TestMetastoreDatabaseFactory.get();
Config config = ConfigBuilder.create()
.addPrimitive(ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY,
EPSILON)
@@ -352,4 +354,9 @@ public class MysqlMultiActiveLeaseArbiterTest {
Assert.assertTrue(firstObtainedStatus.getDagAction().equals(
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH)));
}
+
+ @AfterClass
+ public void tearDown() throws IOException {
+ this.testDb.close();
+ }
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
index cf39adbfc..de4bcacee 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -39,10 +40,11 @@ public class MysqlUserQuotaManagerTest {
private static final String PROXY_USER = "abora";
private MysqlUserQuotaManager quotaManager;
public static int INCREMENTS = 1000;
+ ITestMetastoreDatabase testDb;
@BeforeClass
public void setUp() throws Exception {
- ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+ testDb = TestMetastoreDatabaseFactory.get();
Config config = ConfigBuilder.create()
.addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
@@ -53,7 +55,6 @@ public class MysqlUserQuotaManagerTest {
this.quotaManager = new MysqlUserQuotaManager(config);
}
-
@Test
public void testRunningDagStore() throws Exception {
String dagId =
DagManagerUtils.generateDagId(DagManagerTest.buildDag("dagId", 1234L, "",
1).getNodes().get(0)).toString();
@@ -167,4 +168,9 @@ public class MysqlUserQuotaManagerTest {
thread6.join();
Assert.assertEquals(this.quotaManager.getCount(PROXY_USER,
AbstractUserQuotaManager.CountType.USER_COUNT), -1);
}
+
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws IOException {
+ testDb.close();
+ }
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 21b9fc28b..9c75c0808 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.mockito.Mockito;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -34,6 +35,7 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
@@ -61,9 +63,11 @@ import static org.mockito.Mockito.spy;
public class LaunchDagProcTest {
private MostlyMySqlDagManagementStateStore dagManagementStateStore;
+ private ITestMetastoreDatabase testMetastoreDatabase;
@BeforeClass
public void setUp() throws Exception {
+ this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
this.dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(TestMetastoreDatabaseFactory.get()));
doReturn(FlowSpec.builder().build()).when(this.dagManagementStateStore).getFlowSpec(any());
doNothing().when(this.dagManagementStateStore).tryAcquireQuota(any());
@@ -89,6 +93,11 @@ public class LaunchDagProcTest {
.filter(a ->
a.getMethod().getName().equals("addDagNodeState")).count());
}
+ @AfterClass
+ public void tearDown() throws Exception {
+ this.testMetastoreDatabase.close();
+ }
+
// This creates a dag like this
// D1 D2 D3
// \ | /
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
new file mode 100644
index 000000000..94dfd9c99
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import org.apache.gobblin.service.modules.orchestration.task.ReevaluateDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class ReevaluateDagProcTest {
+ private final long flowExecutionId = System.currentTimeMillis();
+ private final String flowGroup = "fg";
+
+ void mockDMSS(DagManagementStateStore dagManagementStateStore) throws
IOException, SpecNotFoundException {
+
doReturn(FlowSpec.builder().build()).when(dagManagementStateStore).getFlowSpec(any());
+ doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+ doNothing().when(dagManagementStateStore).addDagNodeState(any(), any());
+ doReturn(true).when(dagManagementStateStore).releaseQuota(any());
+ }
+
+ @Test
+ public void testOneNextJobToRun() throws Exception {
+ ITestMetastoreDatabase testMetastoreDatabase =
TestMetastoreDatabaseFactory.get();
+ DagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testMetastoreDatabase));
+ mockDMSS(dagManagementStateStore);
+ String flowName = "fn";
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 2, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ );
+ List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
+ try {
+ return DagManagerUtils.getSpecProducer(n);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+ message("Test
message").eventName(ExecutionStatus.COMPLETE.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+ doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+
doReturn(Optional.of(dag)).when(dagManagementStateStore).getParentDag(any());
+ doNothing().when(dagManagementStateStore).deleteDagNodeState(any(), any());
+
+ ReevaluateDagProc
+ reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
+ String.valueOf(flowExecutionId), "job0",
DagActionStore.DagActionType.REEVALUATE), null, mock(DagActionStore.class)));
+ reEvaluateDagProc.process(dagManagementStateStore);
+
+ long addSpecCount = specProducers.stream()
+ .mapToLong(p -> Mockito.mockingDetails(p)
+ .getInvocations()
+ .stream()
+ .filter(a -> a.getMethod().getName().equals("addSpec"))
+ .count())
+ .sum();
+
+ // next job is sent to spec producer
+ Assert.assertEquals(addSpecCount, 1L);
+
+ // current job's state is deleted
+
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count(), 1);
+
+ testMetastoreDatabase.close();
+ }
+
+ // test when there does not exist a next job in the dag when the current
job's reevaluate dag action is processed
+ @Test
+ public void testNoNextJobToRun() throws Exception {
+ ITestMetastoreDatabase testMetastoreDatabase =
TestMetastoreDatabaseFactory.get();
+ DagManagementStateStore dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(testMetastoreDatabase));
+ mockDMSS(dagManagementStateStore);
+ String flowName = "fn2";
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("2", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 1, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ );
+ JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup).jobName("job0").flowExecutionId(flowExecutionId).
+ message("Test
message").eventName(ExecutionStatus.COMPLETE.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
+
+ doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any());
+ doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)),
Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any());
+
doReturn(Optional.of(dag)).when(dagManagementStateStore).getParentDag(any());
+ doReturn(true).when(dagManagementStateStore).releaseQuota(any());
+ doNothing().when(dagManagementStateStore).deleteDagNodeState(any(), any());
+
+ List<SpecProducer<Spec>> specProducers = dag.getNodes().stream().map(n -> {
+ try {
+ return DagManagerUtils.getSpecProducer(n);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+
+ long addSpecCount = specProducers.stream()
+ .mapToLong(p -> Mockito.mockingDetails(p)
+ .getInvocations()
+ .stream()
+ .filter(a -> a.getMethod().getName().equals("addSpec"))
+ .count())
+ .sum();
+
+ ReevaluateDagProc
+ reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new
DagActionStore.DagAction(flowGroup, flowName,
+ String.valueOf(flowExecutionId), "job0",
DagActionStore.DagActionType.REEVALUATE), null, mock(DagActionStore.class)));
+ reEvaluateDagProc.process(dagManagementStateStore);
+
+ // no new job to launch for this one job flow
+ Assert.assertEquals(addSpecCount, 0L);
+
+ // current job's state is deleted
+
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("deleteDagNodeState")).count(), 1);
+
+ // dag is deleted because the only job in the dag is completed
+
Assert.assertEquals(Mockito.mockingDetails(dagManagementStateStore).getInvocations().stream()
+ .filter(a -> a.getMethod().getName().equals("deleteDag")).count(), 1);
+
+ testMetastoreDatabase.close();
+ }
+}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index 345b5d7cc..fb216910d 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
+import org.apache.commons.lang3.tuple.Pair;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
@@ -96,7 +97,12 @@ public abstract class JobStatusRetrieverTest {
properties.setProperty(TimingEvent.JOB_ORCHESTRATED_TIME,
String.valueOf(endTime));
}
State jobStatus = new State(properties);
- KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus,
this.jobStatusRetriever.getStateStore(), new
NoopGaaSObservabilityEventProducer());
+ Pair<State, Boolean> updatedJobStatus =
KafkaJobStatusMonitor.recalcJobStatus(jobStatus,
this.jobStatusRetriever.getStateStore());
+ jobStatus = updatedJobStatus.getLeft();
+ this.jobStatusRetriever.getStateStore().put(
+ KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName),
+ KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup,
jobName),
+ jobStatus);
}
static Properties createAttemptsProperties(int currGen, int currAttempts,
boolean shouldRetry) {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index 66d552e54..591e65f66 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.Properties;
+import org.apache.commons.lang3.tuple.Pair;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -45,11 +46,12 @@ public class MysqlJobStatusRetrieverTest extends
JobStatusRetrieverTest {
private MysqlJobStatusStateStore<State> dbJobStateStore;
private static final String TEST_USER = "testUser";
private static final String TEST_PASSWORD = "testPassword";
+ private ITestMetastoreDatabase testMetastoreDatabase;
@BeforeClass
@Override
public void setUp() throws Exception {
- ITestMetastoreDatabase testMetastoreDatabase =
TestMetastoreDatabaseFactory.get();
+ this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
String jdbcUrl = testMetastoreDatabase.getJdbcUrl();
ConfigBuilder configBuilder = ConfigBuilder.create();
@@ -127,15 +129,24 @@ public class MysqlJobStatusRetrieverTest extends
JobStatusRetrieverTest {
long flowExecutionId = 12340L;
String flowGroup = Strings.repeat("A",
ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH);
String flowName = Strings.repeat("B",
ServiceConfigKeys.MAX_FLOW_NAME_LENGTH);
+ String jobGroup = Strings.repeat("D",
ServiceConfigKeys.MAX_JOB_GROUP_LENGTH);
+ String jobName = Strings.repeat("C",
ServiceConfigKeys.MAX_JOB_NAME_LENGTH);
+
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
String.valueOf(flowExecutionId));
- properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
Strings.repeat("C", ServiceConfigKeys.MAX_JOB_NAME_LENGTH));
+ properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.ORCHESTRATED.name());
- properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
Strings.repeat("D", ServiceConfigKeys.MAX_JOB_GROUP_LENGTH));
+ properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
jobGroup);
State jobStatus = new State(properties);
- KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus,
this.jobStatusRetriever.getStateStore(), new
NoopGaaSObservabilityEventProducer());
+ Pair<State, Boolean> updatedJobStatus =
KafkaJobStatusMonitor.recalcJobStatus(jobStatus,
this.jobStatusRetriever.getStateStore());
+ jobStatus = updatedJobStatus.getLeft();
+ this.jobStatusRetriever.getStateStore().put(
+ KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName),
+ KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup,
jobName),
+ jobStatus);
+
Iterator<JobStatus>
jobStatusIterator =
this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup,
flowExecutionId);
Assert.assertTrue(jobStatusIterator.hasNext());
@@ -148,18 +159,26 @@ public class MysqlJobStatusRetrieverTest extends
JobStatusRetrieverTest {
long flowExecutionId = 12340L;
String flowGroup = Strings.repeat("A",
ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + 1);
String flowName = Strings.repeat("B",
ServiceConfigKeys.MAX_FLOW_NAME_LENGTH);
+ String jobGroup = Strings.repeat("D",
ServiceConfigKeys.MAX_JOB_GROUP_LENGTH);
+ String jobName = Strings.repeat("C",
ServiceConfigKeys.MAX_JOB_NAME_LENGTH);
+
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
flowGroup);
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowName);
properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
String.valueOf(flowExecutionId));
- properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
Strings.repeat("C", ServiceConfigKeys.MAX_JOB_NAME_LENGTH));
+ properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobName);
properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.ORCHESTRATED.name());
- properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
Strings.repeat("D", ServiceConfigKeys.MAX_JOB_GROUP_LENGTH));
+ properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
jobGroup);
State jobStatus = new State(properties);
try {
- KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus,
this.jobStatusRetriever.getStateStore(), new
NoopGaaSObservabilityEventProducer());
+ Pair<State, Boolean> updatedJobStatus =
KafkaJobStatusMonitor.recalcJobStatus(jobStatus,
this.jobStatusRetriever.getStateStore());
+ jobStatus = updatedJobStatus.getLeft();
+ this.jobStatusRetriever.getStateStore().put(
+ KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, flowName),
+ KafkaJobStatusMonitor.jobStatusTableName(flowExecutionId, jobGroup,
jobName),
+ jobStatus);
} catch (IOException e) {
- Assert.assertTrue(e.getCause().getCause().getMessage().contains("Data
too long"));
+ Assert.assertTrue(e.getCause().getMessage().contains("Data too long"));
return;
}
Assert.fail();
@@ -169,4 +188,9 @@ public class MysqlJobStatusRetrieverTest extends
JobStatusRetrieverTest {
void cleanUpDir() throws Exception {
this.dbJobStateStore.delete(KafkaJobStatusMonitor.jobStatusStoreName(FLOW_GROUP,
FLOW_NAME));
}
+
+ @Override
+ public void tearDown() throws Exception {
+ this.testMetastoreDatabase.close();
+ }
}