[
https://issues.apache.org/jira/browse/GOBBLIN-2015?focusedWorklogId=909498&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-909498
]
ASF GitHub Bot logged work on GOBBLIN-2015:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 12/Mar/24 20:44
Start Date: 12/Mar/24 20:44
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3893:
URL: https://github.com/apache/gobblin/pull/3893#discussion_r1522074443
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -40,36 +65,127 @@
@Alpha
public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>,
Optional<Dag<JobExecutionPlan>>> {
private final LaunchDagTask launchDagTask;
- private final AtomicLong orchestrationDelayCounter;
+ FlowCompilationValidationHelper flowCompilationValidationHelper;
- public LaunchDagProc(LaunchDagTask launchDagTask) {
+ public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper) {
this.launchDagTask = launchDagTask;
- this.orchestrationDelayCounter = new AtomicLong(0);
+ AtomicLong orchestrationDelayCounter = new AtomicLong(0);
Review Comment:
The name of this `orchestrationDelayMetric/Counter` is not accurate if we
are using. The "orchestration" step we are talking about should have occurred
already for the flow in the `Orchestrator` and there we measured how long the
compilation takes. We may however want to emit a separate metric if it feels
helpful
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -40,36 +65,126 @@
@Alpha
public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>,
Optional<Dag<JobExecutionPlan>>> {
private final LaunchDagTask launchDagTask;
- private final AtomicLong orchestrationDelayCounter;
+ FlowCompilationValidationHelper flowCompilationValidationHelper;
- public LaunchDagProc(LaunchDagTask launchDagTask) {
+ public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper) {
this.launchDagTask = launchDagTask;
- this.orchestrationDelayCounter = new AtomicLong(0);
+ AtomicLong orchestrationDelayCounter = new AtomicLong(0);
ContextAwareGauge<Long> orchestrationDelayMetric =
metricContext.newContextAwareGauge
(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get);
metricContext.register(orchestrationDelayMetric);
+ this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ try {
+ DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+ FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+ flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
+ return
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+ } catch (URISyntaxException | SpecNotFoundException | InterruptedException
e) {
+ throw new RuntimeException(e);
+ }
}
@Override
protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ if (!dag.isPresent()) {
+ log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to
launch");
+ return Optional.empty();
Review Comment:
We have Flow group, executor, and service level metrics at present in
`DagManagerMetrics`. The entire metrics class instance should be passed to the
DagProc to increment accordingly.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -40,36 +65,126 @@
@Alpha
public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>,
Optional<Dag<JobExecutionPlan>>> {
private final LaunchDagTask launchDagTask;
- private final AtomicLong orchestrationDelayCounter;
+ FlowCompilationValidationHelper flowCompilationValidationHelper;
- public LaunchDagProc(LaunchDagTask launchDagTask) {
+ public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper) {
this.launchDagTask = launchDagTask;
- this.orchestrationDelayCounter = new AtomicLong(0);
+ AtomicLong orchestrationDelayCounter = new AtomicLong(0);
ContextAwareGauge<Long> orchestrationDelayMetric =
metricContext.newContextAwareGauge
(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get);
metricContext.register(orchestrationDelayMetric);
+ this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ try {
+ DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+ FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+ flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
+ return
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+ } catch (URISyntaxException | SpecNotFoundException | InterruptedException
e) {
+ throw new RuntimeException(e);
+ }
}
@Override
protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ if (!dag.isPresent()) {
+ log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to
launch");
+ return Optional.empty();
+ }
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag.get());
+ Set<Dag.DagNode<JobExecutionPlan>> nextSubmitted =
submitNext(dagManagementStateStore, dag.get());
+ for (Dag.DagNode<JobExecutionPlan> dagNode : nextSubmitted) {
+ dagManagementStateStore.addDagNodeState(dagNode, dagId); // compare
this - arjun1
+ }
+
+ log.info("Dag {} processed.", dagId);
+ return dag;
}
- @Override
- protected void sendNotification(Optional<Dag<JobExecutionPlan>> result,
EventSubmitter eventSubmitter)
- throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ /**
+ * Submit next set of Dag nodes in the Dag identified by the provided dagId
+ */
+ private Set<Dag.DagNode<JobExecutionPlan>>
submitNext(DagManagementStateStore dagManagementStateStore,
+ Dag<JobExecutionPlan> dag) throws IOException {
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+ Set<Dag.DagNode<JobExecutionPlan>> nextNodes =
DagManagerUtils.getNext(dag);
+ List<String> nextJobNames = new ArrayList<>();
+
+ //Submit jobs from the dag ready for execution.
+ for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+ submitJob(dagManagementStateStore, dagNode);
+ nextJobNames.add(DagManagerUtils.getJobName(dagNode));
+ }
+
+ log.info("Submitting next nodes for dagId {}, where next jobs to be
submitted are {}", dagId, nextJobNames);
+
+ //Checkpoint the dag state, it should have an updated value of dag nodes
+ dagManagementStateStore.checkpointDag(dag);
+
+ return nextNodes;
+ }
+
+ /**
+ * Submits a {@link JobSpec} to a {@link SpecExecutor}.
+ */
+ private void submitJob(DagManagementStateStore dagManagementStateStore,
Dag.DagNode<JobExecutionPlan> dagNode) {
Review Comment:
rename `submitJobToExecutor`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -40,36 +65,126 @@
@Alpha
public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>,
Optional<Dag<JobExecutionPlan>>> {
private final LaunchDagTask launchDagTask;
- private final AtomicLong orchestrationDelayCounter;
+ FlowCompilationValidationHelper flowCompilationValidationHelper;
- public LaunchDagProc(LaunchDagTask launchDagTask) {
+ public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper) {
this.launchDagTask = launchDagTask;
- this.orchestrationDelayCounter = new AtomicLong(0);
+ AtomicLong orchestrationDelayCounter = new AtomicLong(0);
ContextAwareGauge<Long> orchestrationDelayMetric =
metricContext.newContextAwareGauge
(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get);
metricContext.register(orchestrationDelayMetric);
+ this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ try {
+ DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+ FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+ flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
+ return
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+ } catch (URISyntaxException | SpecNotFoundException | InterruptedException
e) {
+ throw new RuntimeException(e);
Review Comment:
I agree, let's start delving into retrying errors or at least planning it in
this PR. `Process` should be able to distinguish retry-able errors from
non-retry able ones.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -40,36 +65,126 @@
@Alpha
public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>,
Optional<Dag<JobExecutionPlan>>> {
private final LaunchDagTask launchDagTask;
- private final AtomicLong orchestrationDelayCounter;
+ FlowCompilationValidationHelper flowCompilationValidationHelper;
- public LaunchDagProc(LaunchDagTask launchDagTask) {
+ public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper) {
this.launchDagTask = launchDagTask;
- this.orchestrationDelayCounter = new AtomicLong(0);
+ AtomicLong orchestrationDelayCounter = new AtomicLong(0);
ContextAwareGauge<Long> orchestrationDelayMetric =
metricContext.newContextAwareGauge
(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get);
metricContext.register(orchestrationDelayMetric);
+ this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ try {
+ DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+ FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+ flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
+ return
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+ } catch (URISyntaxException | SpecNotFoundException | InterruptedException
e) {
+ throw new RuntimeException(e);
+ }
}
@Override
protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ if (!dag.isPresent()) {
+ log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to
launch");
+ return Optional.empty();
+ }
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag.get());
+ Set<Dag.DagNode<JobExecutionPlan>> nextSubmitted =
submitNext(dagManagementStateStore, dag.get());
+ for (Dag.DagNode<JobExecutionPlan> dagNode : nextSubmitted) {
+ dagManagementStateStore.addDagNodeState(dagNode, dagId); // compare
this - arjun1
+ }
+
+ log.info("Dag {} processed.", dagId);
+ return dag;
}
- @Override
- protected void sendNotification(Optional<Dag<JobExecutionPlan>> result,
EventSubmitter eventSubmitter)
- throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ /**
+ * Submit next set of Dag nodes in the Dag identified by the provided dagId
+ */
+ private Set<Dag.DagNode<JobExecutionPlan>>
submitNext(DagManagementStateStore dagManagementStateStore,
+ Dag<JobExecutionPlan> dag) throws IOException {
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
+ Set<Dag.DagNode<JobExecutionPlan>> nextNodes =
DagManagerUtils.getNext(dag);
+ List<String> nextJobNames = new ArrayList<>();
+
+ //Submit jobs from the dag ready for execution.
+ for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+ submitJob(dagManagementStateStore, dagNode);
+ nextJobNames.add(DagManagerUtils.getJobName(dagNode));
+ }
+
+ log.info("Submitting next nodes for dagId {}, where next jobs to be
submitted are {}", dagId, nextJobNames);
+
+ //Checkpoint the dag state, it should have an updated value of dag nodes
+ dagManagementStateStore.checkpointDag(dag);
+
+ return nextNodes;
+ }
+
+ /**
+ * Submits a {@link JobSpec} to a {@link SpecExecutor}.
+ */
+ private void submitJob(DagManagementStateStore dagManagementStateStore,
Dag.DagNode<JobExecutionPlan> dagNode) {
+ DagManagerUtils.incrementJobAttempt(dagNode);
+ JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
+ jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+ JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+ // Run this spec on selected executor
+ SpecProducer<Spec> producer;
+ try {
+ dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode));
+ producer = DagManagerUtils.getSpecProducer(dagNode);
+ TimingEvent jobOrchestrationTimer =
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED);
+
+ // Increment job count before submitting the job onto the spec producer,
in case that throws an exception.
+ // By this point the quota is allocated, so it's imperative to increment
as missing would introduce the potential to decrement below zero upon quota
release.
+ // Quota release is guaranteed, despite failure, because exception
handling within would mark the job FAILED.
+ // When the ensuing kafka message spurs DagManager processing, the quota
is released and the counts decremented
+ // Ensure that we do not double increment for flows that are retried
+ if (dagNode.getValue().getCurrentAttempts() == 1) {
+
DagManagementTaskStreamImpl.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);
+ }
+ // Submit the job to the SpecProducer, which in turn performs the actual
job submission to the SpecExecutor instance.
+ // The SpecProducer implementations submit the job to the underlying
executor and return when the submission is complete,
+ // either successfully or unsuccessfully. To catch any exceptions in the
job submission, the DagManagerThread
+ // blocks (by calling Future#get()) until the submission is completed.
+ Future<?> addSpecFuture = producer.addSpec(jobSpec);
+
dagNode.getValue().setJobFuture(com.google.common.base.Optional.of(addSpecFuture));
+
+ addSpecFuture.get();
+
+ jobMetadata.put(TimingEvent.METADATA_MESSAGE,
producer.getExecutionLink(addSpecFuture, specExecutorUri));
+ // Add serialized job properties as part of the orchestrated job event
metadata
+ jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
dagNode.getValue().toString());
+ jobOrchestrationTimer.stop(jobMetadata);
+ log.info("Orchestrated job: {} on Executor: {}",
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
+
DagManagementTaskStreamImpl.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
+ } catch (Exception e) {
+ TimingEvent jobFailedTimer =
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
+ String message = "Cannot submit job " +
DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " +
specExecutorUri;
+ log.error(message, e);
+ jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " +
e.getMessage());
+ if (jobFailedTimer != null) {
+ jobFailedTimer.stop(jobMetadata);
Review Comment:
no error is thrown here so how does the `DagProc::process` know to complete
the lease or not? This seems like a case of a retry-able error
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -40,36 +65,126 @@
@Alpha
public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>,
Optional<Dag<JobExecutionPlan>>> {
private final LaunchDagTask launchDagTask;
- private final AtomicLong orchestrationDelayCounter;
+ FlowCompilationValidationHelper flowCompilationValidationHelper;
- public LaunchDagProc(LaunchDagTask launchDagTask) {
+ public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper) {
this.launchDagTask = launchDagTask;
- this.orchestrationDelayCounter = new AtomicLong(0);
+ AtomicLong orchestrationDelayCounter = new AtomicLong(0);
ContextAwareGauge<Long> orchestrationDelayMetric =
metricContext.newContextAwareGauge
(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get);
metricContext.register(orchestrationDelayMetric);
+ this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ try {
+ DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
+ FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(flowUri);
+ flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
+ return
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+ } catch (URISyntaxException | SpecNotFoundException | InterruptedException
e) {
+ throw new RuntimeException(e);
+ }
}
@Override
protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
throws IOException {
- throw new UnsupportedOperationException("Not yet implemented");
+ if (!dag.isPresent()) {
+ log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to
launch");
+ return Optional.empty();
+ }
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag.get());
+ Set<Dag.DagNode<JobExecutionPlan>> nextSubmitted =
submitNext(dagManagementStateStore, dag.get());
+ for (Dag.DagNode<JobExecutionPlan> dagNode : nextSubmitted) {
+ dagManagementStateStore.addDagNodeState(dagNode, dagId); // compare
this - arjun1
+ }
+
+ log.info("Dag {} processed.", dagId);
Review Comment:
This statement is a bit ambiguous. Let's be specific:
`Launch dagProc concluded actions for dagID: DagID {}` we should have
similar message for each type of DagProc. Do we want a log after initialization
as well? Let's think carefully so we don't flood with logs.
Issue Time Tracking
-------------------
Worklog Id: (was: 909498)
Remaining Estimate: 0h
Time Spent: 10m
> implement LaunchDagProc
> -----------------------
>
> Key: GOBBLIN-2015
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2015
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)