phet commented on code in PR #3893:
URL: https://github.com/apache/gobblin/pull/3893#discussion_r1522580123
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -75,7 +75,7 @@ public DagProcessingEngine(Config config, DagTaskStream
dagTaskStream, DagProcFa
@AllArgsConstructor
@VisibleForTesting
- static class DagProcEngineThread implements Runnable {
+ public static class DagProcEngineThread implements Runnable {
Review Comment:
I don't see any new uses in this PR. since just `@VisibleForTesting`, it's
actually better to keep package-protected than it is `public`.
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java:
##########
@@ -52,14 +52,9 @@ public class MostlyMySqlDagManagementStateStoreTest {
private static final String TEST_PASSWORD = "testPassword";
private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
private static final String TEST_TABLE = "quotas";
- static ITestMetastoreDatabase testMetastoreDatabase;
-
-
- @BeforeClass
- public void setUp() throws Exception {
- // Setting up mock DB
- testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ public static MostlyMySqlDagManagementStateStore
getDummyDMSS(ITestMetastoreDatabase testMetastoreDatabase) throws Exception {
+ MostlyMySqlDagManagementStateStore dagManagementStateStore;
Review Comment:
why declare it here, ahead of where it's initialized on line 72?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -18,58 +18,165 @@
package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.collect.Maps;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementTaskStreamImpl;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
/**
- * An implementation for {@link LaunchDagTask}
+ * An implementation for {@link DagProc} that launches a new job.
*/
@Slf4j
@Alpha
public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>,
Optional<Dag<JobExecutionPlan>>> {
private final LaunchDagTask launchDagTask;
- private final AtomicLong orchestrationDelayCounter;
+ private final FlowCompilationValidationHelper
flowCompilationValidationHelper;
+ private static final AtomicLong orchestrationDelayCounter = new
AtomicLong(0);
+ static {
+ metricContext.register(
+
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get));
+ }
- public LaunchDagProc(LaunchDagTask launchDagTask) {
+ public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper) {
Review Comment:
`@RequiredArgsConstructor`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -18,58 +18,165 @@
package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.collect.Maps;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementTaskStreamImpl;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
/**
- * An implementation for {@link LaunchDagTask}
+ * An implementation for {@link DagProc} that launches a new job.
*/
@Slf4j
@Alpha
public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>,
Optional<Dag<JobExecutionPlan>>> {
private final LaunchDagTask launchDagTask;
- private final AtomicLong orchestrationDelayCounter;
+ private final FlowCompilationValidationHelper
flowCompilationValidationHelper;
+ private static final AtomicLong orchestrationDelayCounter = new
AtomicLong(0);
+ static {
+ metricContext.register(
+
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get));
+ }
- public LaunchDagProc(LaunchDagTask launchDagTask) {
+ public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper) {
this.launchDagTask = launchDagTask;
- this.orchestrationDelayCounter = new AtomicLong(0);
- ContextAwareGauge<Long> orchestrationDelayMetric =
metricContext.newContextAwareGauge
- (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get);
- metricContext.register(orchestrationDelayMetric);
+ this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ try {
+ DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+ FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+ flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
+ return
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+ } catch (URISyntaxException | SpecNotFoundException | InterruptedException
e) {
+ throw new RuntimeException(e);
+ }
}
@Override
protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ if (!dag.isPresent()) {
+ log.warn("Dag with id " + this.launchDagTask.getDagId() + " could not be
compiled.");
+ // todo - add metrics
+ return Optional.empty();
+ }
+ submitNextNodes(dagManagementStateStore, dag.get());
+ log.info("Launch dagProc concluded actions for dagId : {}",
this.launchDagTask.getDagId());
Review Comment:
see comment about suggesting to move this logging to `DagProc::process`
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java:
##########
@@ -184,6 +184,43 @@ static Dag<JobExecutionPlan> buildDag(String id, Long
flowExecutionId, String fl
return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
}
+ // This creates a dag like this
+ // D1 D2 D3
+ // \ | /
+ // DN4
+ // / | \
+ // D5 D6 D7
+ public static Dag<JobExecutionPlan>
buildDagWithMultipleNodesAtDifferentLevels(String id, Long flowExecutionId,
String flowFailureOption,
Review Comment:
since this isn't a test but a utility method that seems unused in this
defining class, does it really belong here?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -40,36 +65,126 @@
@Alpha
public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>,
Optional<Dag<JobExecutionPlan>>> {
private final LaunchDagTask launchDagTask;
- private final AtomicLong orchestrationDelayCounter;
+ FlowCompilationValidationHelper flowCompilationValidationHelper;
- public LaunchDagProc(LaunchDagTask launchDagTask) {
+ public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper) {
this.launchDagTask = launchDagTask;
- this.orchestrationDelayCounter = new AtomicLong(0);
+ AtomicLong orchestrationDelayCounter = new AtomicLong(0);
ContextAwareGauge<Long> orchestrationDelayMetric =
metricContext.newContextAwareGauge
(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get);
metricContext.register(orchestrationDelayMetric);
+ this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ try {
+ DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+ FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+ flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
+ return
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+ } catch (URISyntaxException | SpecNotFoundException | InterruptedException
e) {
+ throw new RuntimeException(e);
+ }
}
@Override
protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ if (!dag.isPresent()) {
+ log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to
launch");
+ return Optional.empty();
+ }
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag.get());
+ Set<Dag.DagNode<JobExecutionPlan>> nextSubmitted =
submitNext(dagManagementStateStore, dag.get());
+ for (Dag.DagNode<JobExecutionPlan> dagNode : nextSubmitted) {
+ dagManagementStateStore.addDagNodeState(dagNode, dagId); // compare
this - arjun1
+ }
+
+ log.info("Dag {} processed.", dagId);
+ return dag;
}
- @Override
- protected void sendNotification(Optional<Dag<JobExecutionPlan>> result,
EventSubmitter eventSubmitter)
- throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ /**
+ * Submit next set of Dag nodes in the Dag identified by the provided dagId
+ */
+ private Set<Dag.DagNode<JobExecutionPlan>>
submitNext(DagManagementStateStore dagManagementStateStore,
+ Dag<JobExecutionPlan> dag) throws IOException {
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+ Set<Dag.DagNode<JobExecutionPlan>> nextNodes =
DagManagerUtils.getNext(dag);
+ List<String> nextJobNames = new ArrayList<>();
+
+ //Submit jobs from the dag ready for execution.
+ for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+ submitJob(dagManagementStateStore, dagNode);
+ nextJobNames.add(DagManagerUtils.getJobName(dagNode));
+ }
+
+ log.info("Submitting next nodes for dagId {}, where next jobs to be
submitted are {}", dagId, nextJobNames);
+
+ //Checkpoint the dag state, it should have an updated value of dag nodes
+ dagManagementStateStore.checkpointDag(dag);
+
+ return nextNodes;
+ }
+
+ /**
+ * Submits a {@link JobSpec} to a {@link SpecExecutor}.
+ */
+ private void submitJob(DagManagementStateStore dagManagementStateStore,
Dag.DagNode<JobExecutionPlan> dagNode) {
+ DagManagerUtils.incrementJobAttempt(dagNode);
+ JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
+ jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+ JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+ // Run this spec on selected executor
+ SpecProducer<Spec> producer;
+ try {
+ dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode));
+ producer = DagManagerUtils.getSpecProducer(dagNode);
+ TimingEvent jobOrchestrationTimer =
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED);
+
+ // Increment job count before submitting the job onto the spec producer,
in case that throws an exception.
+ // By this point the quota is allocated, so it's imperative to increment
as missing would introduce the potential to decrement below zero upon quota
release.
+ // Quota release is guaranteed, despite failure, because exception
handling within would mark the job FAILED.
+ // When the ensuing kafka message spurs DagManager processing, the quota
is released and the counts decremented
+ // Ensure that we do not double increment for flows that are retried
+ if (dagNode.getValue().getCurrentAttempts() == 1) {
+
DagManagementTaskStreamImpl.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);
Review Comment:
any thoughts?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -18,58 +18,165 @@
package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.collect.Maps;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementTaskStreamImpl;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
/**
- * An implementation for {@link LaunchDagTask}
+ * An implementation for {@link DagProc} that launches a new job.
*/
@Slf4j
@Alpha
public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>,
Optional<Dag<JobExecutionPlan>>> {
private final LaunchDagTask launchDagTask;
- private final AtomicLong orchestrationDelayCounter;
+ private final FlowCompilationValidationHelper
flowCompilationValidationHelper;
+ private static final AtomicLong orchestrationDelayCounter = new
AtomicLong(0);
+ static {
+ metricContext.register(
+
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get));
+ }
- public LaunchDagProc(LaunchDagTask launchDagTask) {
+ public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper) {
this.launchDagTask = launchDagTask;
- this.orchestrationDelayCounter = new AtomicLong(0);
- ContextAwareGauge<Long> orchestrationDelayMetric =
metricContext.newContextAwareGauge
- (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get);
- metricContext.register(orchestrationDelayMetric);
+ this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ try {
+ DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+ FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+ flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
+ return
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+ } catch (URISyntaxException | SpecNotFoundException | InterruptedException
e) {
+ throw new RuntimeException(e);
+ }
}
@Override
protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ if (!dag.isPresent()) {
+ log.warn("Dag with id " + this.launchDagTask.getDagId() + " could not be
compiled.");
+ // todo - add metrics
+ return Optional.empty();
+ }
+ submitNextNodes(dagManagementStateStore, dag.get());
+ log.info("Launch dagProc concluded actions for dagId : {}",
this.launchDagTask.getDagId());
+ return dag;
}
- @Override
- protected void sendNotification(Optional<Dag<JobExecutionPlan>> result,
EventSubmitter eventSubmitter)
- throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ /**
+ * Submit next set of Dag nodes in the provided Dag.
+ */
+ private void submitNextNodes(DagManagementStateStore
dagManagementStateStore,
+ Dag<JobExecutionPlan> dag) throws IOException {
+ Set<Dag.DagNode<JobExecutionPlan>> nextNodes =
DagManagerUtils.getNext(dag);
+
+ //Submit jobs from the dag ready for execution.
+ for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+ submitJobToExecutor(dagManagementStateStore, dagNode);
+ dagManagementStateStore.addDagNodeState(dagNode,
this.launchDagTask.getDagId());
+ log.info("Submitted job {} for dagId {}",
DagManagerUtils.getJobName(dagNode), this.launchDagTask.getDagId());
+ }
+
+ //Checkpoint the dag state, it should have an updated value of dag nodes
+ dagManagementStateStore.checkpointDag(dag);
+ }
+
+ /**
+ * Submits a {@link JobSpec} to a {@link SpecExecutor}.
+ */
+ private void submitJobToExecutor(DagManagementStateStore
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode) {
+ DagManagerUtils.incrementJobAttempt(dagNode);
+ JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
+ JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+ // Run this spec on selected executor
+ SpecProducer<Spec> producer;
+ try {
+ producer = DagManagerUtils.getSpecProducer(dagNode);
+ TimingEvent jobOrchestrationTimer =
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED);
+
+ // Increment job count before submitting the job onto the spec producer,
in case that throws an exception.
+ // By this point the quota is allocated, so it's imperative to increment
as missing would introduce the potential to decrement below zero upon quota
release.
+ // Quota release is guaranteed, despite failure, because exception
handling within would mark the job FAILED.
+ // When the ensuing kafka message spurs DagManager processing, the quota
is released and the counts decremented
+ // Ensure that we do not double increment for flows that are retried
+ if (DagManagerUtils.getJobExecutionPlan(dagNode).getCurrentAttempts() ==
1) {
+
DagManagementTaskStreamImpl.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);
+ }
+ // Submit the job to the SpecProducer, which in turn performs the actual
job submission to the SpecExecutor instance.
+ // The SpecProducer implementations submit the job to the underlying
executor and return when the submission is complete,
+ // either successfully or unsuccessfully. To catch any exceptions in the
job submission, the DagManagerThread
+ // blocks (by calling Future#get()) until the submission is completed.
+ dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode));
+ Future<?> addSpecFuture = producer.addSpec(jobSpec);
+
dagNode.getValue().setJobFuture(com.google.common.base.Optional.of(addSpecFuture));
Review Comment:
maybe add a TODO to `JobExecutionPlan` to store alternatively the
already-forced value in lieu of the future?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Optional;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementTaskStreamImpl;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStoreTest;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class LaunchDagProcTest {
+ MostlyMySqlDagManagementStateStore dagManagementStateStore;
+ DagManagementTaskStreamImpl dagManagementTaskStream;
+ @BeforeClass
+ public void setUp() throws Exception {
+ this.dagManagementTaskStream = new
DagManagementTaskStreamImpl(ConfigBuilder.create().build(), Optional.empty());
+ this.dagManagementStateStore =
spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(TestMetastoreDatabaseFactory.get()));
+
doReturn(FlowSpec.builder().build()).when(this.dagManagementStateStore).getFlowSpec(any());
+ doNothing().when(this.dagManagementStateStore).tryAcquireQuota(any());
+ doNothing().when(this.dagManagementStateStore).addDagNodeState(any(),
any());
+ }
+ @Test
+ public void launchDag()
+ throws IOException, InterruptedException, URISyntaxException {
+ // this creates a dag with 3 start nodes
+ Dag<JobExecutionPlan> dag1 =
DagManagerTest.buildDagWithMultipleNodesAtDifferentLevels("1",
System.currentTimeMillis(), DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ "user5",
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("group2")));
+ FlowCompilationValidationHelper flowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
+
doReturn(com.google.common.base.Optional.of(dag1)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
+ LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new
DagActionStore.DagAction("fg", "fn",
+ "12345", DagActionStore.FlowActionType.LAUNCH), null),
flowCompilationValidationHelper);
+
+ launchDagProc.process(this.dagManagementStateStore);
+ int expectedNumOfSavingDagNodeStates = 3; // = number of start nodes
+ Assert.assertEquals(expectedNumOfSavingDagNodeStates,
+
Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
+ .filter(a ->
a.getMethod().getName().equals("addDagNodeState")).count());
+ System.out.println();
Review Comment:
remove
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]