[
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=904486&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-904486
]
ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 10/Feb/24 00:40
Start Date: 10/Feb/24 00:40
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3858:
URL: https://github.com/apache/gobblin/pull/3858#discussion_r1484848855
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+ Dag<JobExecutionPlan> getDag(String dagId);
+ Dag<JobExecutionPlan> addDag(String dagId, Dag<JobExecutionPlan> dag);
+ boolean containsDag(String dagId);
+ Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+ Dag<JobExecutionPlan> getDagForJob(Dag.DagNode<JobExecutionPlan> dagNode);
Review Comment:
naming confuses me. is this equivalent to `getParentDag`? if so, shouldn't
that be a method on `Dag.DagNode`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+ Dag<JobExecutionPlan> getDag(String dagId);
+ Dag<JobExecutionPlan> addDag(String dagId, Dag<JobExecutionPlan> dag);
Review Comment:
add javadoc to clarify if return value is same as param passed in or was the
previous value that might now be replaced...
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+ Dag<JobExecutionPlan> getDag(String dagId);
+ Dag<JobExecutionPlan> addDag(String dagId, Dag<JobExecutionPlan> dag);
+ boolean containsDag(String dagId);
+ Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+ Dag<JobExecutionPlan> getDagForJob(Dag.DagNode<JobExecutionPlan> dagNode);
+ List<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) throws IOException;
+ List<Dag.DagNode<JobExecutionPlan>> getAllJobs() throws IOException;
+ boolean addFailedDag(String dagId);
Review Comment:
nit: `markDagFailed`? it doesn't add a `Dag`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+ Dag<JobExecutionPlan> getDag(String dagId);
+ Dag<JobExecutionPlan> addDag(String dagId, Dag<JobExecutionPlan> dag);
+ boolean containsDag(String dagId);
+ Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+ Dag<JobExecutionPlan> getDagForJob(Dag.DagNode<JobExecutionPlan> dagNode);
+ List<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) throws IOException;
+ List<Dag.DagNode<JobExecutionPlan>> getAllJobs() throws IOException;
+ boolean addFailedDag(String dagId);
+ boolean existsFailedDag(String dagId);
+ boolean addCleanUpDag(String dagId);
+ boolean checkCleanUpDag(String dagId);
Review Comment:
if `existsFailedDag` and `checkCleanUpDag` are parallels, let's align the
naming
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+ Dag<JobExecutionPlan> getDag(String dagId);
+ Dag<JobExecutionPlan> addDag(String dagId, Dag<JobExecutionPlan> dag);
+ boolean containsDag(String dagId);
+ Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+ Dag<JobExecutionPlan> getDagForJob(Dag.DagNode<JobExecutionPlan> dagNode);
+ List<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) throws IOException;
+ List<Dag.DagNode<JobExecutionPlan>> getAllJobs() throws IOException;
+ boolean addFailedDag(String dagId);
+ boolean existsFailedDag(String dagId);
+ boolean addCleanUpDag(String dagId);
+ boolean checkCleanUpDag(String dagId);
+ void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+ void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+ void removeDagActionFromStore(DagActionStore.DagAction dagAction) throws
IOException;
+ Map<String, Long> getDagToSLA();
Review Comment:
1. we have two "SLAs"... which is this?
2. they aren't truly SLAs, but deadlines, so suggest changing naming (even
if we retain dual config for compatibility (e.g. `flow.start.deadline`)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+public interface DagTaskVisitor<D extends DagProc> {
+ LaunchDagProc meet(LaunchDagTask launchDagTask);
Review Comment:
let's not hard-code the type mapping between each DagTask and DagProc. in
fact, let's not even hard-code that each `DagTask` must turn into a `DagProc`.
this is the reason for having a generic param: so the parameterizable
generic type is returned by every `meet` method. to be fully general, the type
param should stand-alone and not `extends DagProc`.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.NewDagManager;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagProc} for launching {@link
org.apache.gobblin.service.modules.orchestration.task.DagTask}.
+ */
+@Slf4j
+@Alpha
+public class LaunchDagProc extends DagProc<Dag<JobExecutionPlan>, Void> {
+ private final LaunchDagTask launchDagTask;
+ NewDagManager newDagManager;
+
+ public LaunchDagProc(LaunchDagTask launchDagTask, NewDagManager
newDagManager) {
+ this.launchDagTask = launchDagTask;
+ this.newDagManager = newDagManager;
+ }
+
+ @Override
+ protected Dag<JobExecutionPlan> initialize(DagManagementStateStore
dagManagementStateStore) throws IOException {
+ return
dagManagementStateStore.getDag(this.launchDagTask.getDagId().toString());
+ }
+
+ @Override
+ protected Void act(DagManagementStateStore dagManagementStateStore,
Dag<JobExecutionPlan> dag) throws IOException {
+ if (dag == null) {
+ log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to
launch");
+ return null;
+ }
+ initializeDag(dag, dagManagementStateStore);
+ DagManagerUtils.submitPendingExecStatus(dag, this.eventSubmitter);
+ return null;
+ }
+
+ protected void initializeDag(Dag<JobExecutionPlan> dag,
DagManagementStateStore dagManagementStateStore)
+ throws IOException {
+ //Add Dag to the map of running dags
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
+ log.info("Initializing Dag {}",
DagManagerUtils.getFullyQualifiedDagName(dag));
+
+ //A flag to indicate if the flow is already running.
+ boolean isDagRunning = false;
+ //Are there any jobs already in the running state? This check is for Dags
already running
+ //before a leadership change occurs.
+ for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+ if (DagManagerUtils.getExecutionStatus(dagNode) ==
ExecutionStatus.RUNNING) {
+ dagManagementStateStore.addJobState(dagId, dagNode);
+ //Update the running jobs counter.
+
NewDagManager.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);
+ isDagRunning = true;
+ }
+ }
+
+ FlowId flowId = DagManagerUtils.getFlowId(dag);
+ NewDagManager.getDagManagerMetrics().registerFlowMetric(flowId, dag);
+
+ log.debug("Dag {} submitting jobs ready for execution.",
DagManagerUtils.getFullyQualifiedDagName(dag));
+ //Determine the next set of jobs to run and submit them for execution
+ Map<String, Set<Dag.DagNode<JobExecutionPlan>>> nextSubmitted =
submitNext(dagManagementStateStore, dagId);
+ for (Dag.DagNode<JobExecutionPlan> dagNode : nextSubmitted.get(dagId)) {
+ dagManagementStateStore.addJobState(dagId, dagNode);
+ }
+
+ // Set flow status to running
+ DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag,
TimingEvent.FlowTimings.FLOW_RUNNING);
Review Comment:
I originally though `act` might set things up`, but `commit` would actually
record them and therefore send out GTEs of this sort... we should discuss...
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/event/JobStatusEvent.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring.event;
+
+import lombok.Getter;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+
+/**
+ * An object that {@link
org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor} emits when it
receives a final
+ * job status GTE.
+ */
+@Getter
+public class JobStatusEvent {
+ State jobStatusState;
+ String flowGroup;
+ String flowName;
+ long flowExecutionId;
+ String jobGroup;
+ String jobName;
+ ExecutionStatus status;
+ JobStatus jobStatus;
Review Comment:
what are the pros/cons of having the KJSM send these along vs. having the
recipient look them up fresh from the `DMStateStore`? I would think the latter
approach would guard against out-of-date info...
(clearly, if you removed these, you'd need to rename this class...
`UpdatedJobEvent`, since you'd only send info about the job that was updated,
not specifically about what was updated...)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state,
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the
executor.
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, T> {
+ protected final EventSubmitter eventSubmitter;
+ protected final AtomicLong orchestrationDelay;
Review Comment:
do all `DagProc`s need this or only some? ...also, not completely clear
what it is, so javadoc would help
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends DagProc<KillDagProc.CancelEntity,
Optional<Dag<JobExecutionPlan>>> {
+
+ // should dag task be a part of dag proc?
+ private final KillDagTask killDagTask;
+
+ public KillDagProc(KillDagTask killDagTask) {
+ this.killDagTask = killDagTask;
+ }
+
+ protected CancelEntity initialize(DagManagementStateStore
dagManagementStateStore) throws IOException {
+ Dag<JobExecutionPlan> dagToCancel =
dagManagementStateStore.getDag(this.killDagTask.getDagId().toString());
+ List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = new
ArrayList<>(dagManagementStateStore.getJobs(this.killDagTask.getDagId().toString()));
+ return new CancelEntity(dagToCancel, dagNodesToCancel);
+ }
+
+ @Override
+ public Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore
dagManagementStateStore, CancelEntity cancelEntity) throws IOException {
+ if (cancelEntity.dagToCancel == null ||
cancelEntity.dagNodesToCancel.isEmpty()) {
+ log.warn("No dag with id " + this.killDagTask.getDagId() + " found to
kill");
+ return Optional.empty();
+ }
+ for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel :
cancelEntity.dagNodesToCancel) {
+ cancelDagNode(cancelEntity.dagToCancel, dagNodeToCancel,
dagManagementStateStore);
+ }
+ return Optional.of(cancelEntity.dagToCancel);
+ }
+
+ private void cancelDagNode(Dag<JobExecutionPlan> dagToCancel,
Dag.DagNode<JobExecutionPlan> dagNodeToCancel, DagManagementStateStore
dagManagementStateStore) throws IOException {
+ Properties props = new Properties();
+ try {
+ if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+ Future future = dagNodeToCancel.getValue().getJobFuture().get();
+ String serializedFuture =
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+ props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
serializedFuture);
+ sendCancellationEvent(dagNodeToCancel.getValue());
+ }
+ if
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
{
+ props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+ }
+
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
props);
+ dagToCancel.setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
+ dagToCancel.setMessage("Flow killed by request");
+
dagManagementStateStore.removeDagActionFromStore(this.killDagTask.getDagAction());
+ } catch (ExecutionException | InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
+ jobExecutionPlan.setExecutionStatus(CANCELLED);
+ }
+
+ @AllArgsConstructor
Review Comment:
could all the members be `final`?
why not encapsulate behavior here such as:
```
for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel :
cancelEntity.dagNodesToCancel) {
cancelDagNode(cancelEntity.dagToCancel, dagNodeToCancel,
dagManagementStateStore);
}
```
as
```
/** @returns how many canceled; @throws ...??? */
public int cancelAllDagNodes(DMStateStore stateStore);
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+/**
+ * Factory for creating {@link DagProc} based on the visitor type for a given
{@link DagTask}.
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcFactory implements DagTaskVisitor {
+ @Inject private NewDagManager newDagManager;
+
+ @Override
+ public LaunchDagProc meet(LaunchDagTask launchDagTask) {
+ return new LaunchDagProc(launchDagTask, this.newDagManager);
Review Comment:
rather than constructing the proc w/ the DM, let's have the `DagProcEngine`
(or equiv) provide it during the `DagProc.process` method invocation
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+ Dag<JobExecutionPlan> getDag(String dagId);
+ Dag<JobExecutionPlan> addDag(String dagId, Dag<JobExecutionPlan> dag);
+ boolean containsDag(String dagId);
+ Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+ Dag<JobExecutionPlan> getDagForJob(Dag.DagNode<JobExecutionPlan> dagNode);
+ List<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) throws IOException;
+ List<Dag.DagNode<JobExecutionPlan>> getAllJobs() throws IOException;
+ boolean addFailedDag(String dagId);
+ boolean existsFailedDag(String dagId);
+ boolean addCleanUpDag(String dagId);
+ boolean checkCleanUpDag(String dagId);
+ void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
Review Comment:
`addDagNodeState`? `addDagNodeToDag`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+ Dag<JobExecutionPlan> getDag(String dagId);
+ Dag<JobExecutionPlan> addDag(String dagId, Dag<JobExecutionPlan> dag);
+ boolean containsDag(String dagId);
+ Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+ Dag<JobExecutionPlan> getDagForJob(Dag.DagNode<JobExecutionPlan> dagNode);
+ List<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) throws IOException;
+ List<Dag.DagNode<JobExecutionPlan>> getAllJobs() throws IOException;
Review Comment:
I realize a DagNode is roughly a Job, but for clarity, I'd still recommend
`getDagNodes` and `getEveryDagNode`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+/**
+ * Factory for creating {@link DagProc} based on the visitor type for a given
{@link DagTask}.
Review Comment:
no real need to mention this is a visitor (it's kind of an impl detail), but
if you want to I suggest:
> {@link DagTaskVisitor} for transforming a specific {@link DagTask} derived
class to its companion {@link DagProc} derived class
OR alt.
> for mapping from a specific type of {@link DagTask} to the {@link DagProc}
class hierarchy
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends DagProc<KillDagProc.CancelEntity,
Optional<Dag<JobExecutionPlan>>> {
+
+ // should dag task be a part of dag proc?
+ private final KillDagTask killDagTask;
+
+ public KillDagProc(KillDagTask killDagTask) {
+ this.killDagTask = killDagTask;
+ }
Review Comment:
the `DagTask`s are to be pojos w/ no "behavior", so it shouldn't be "used"
within, but if we want to access fields of it that's OK.
personally, I'd probably define members of this class, then do `KillDagTask`
field access in the `DagProcFactory` and finally pass those values into this
ctor. I believe unit tests would be clearest if we merely constructed
`KillDagProc` w/ a `dagId`, rather than needing to first create a `KillDagTask`.
nonetheless, if you really want to just retain the singular `KillDagTask`,
and access it within, that not the end of the world.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends DagProc<KillDagProc.CancelEntity,
Optional<Dag<JobExecutionPlan>>> {
+
+ // should dag task be a part of dag proc?
+ private final KillDagTask killDagTask;
+
+ public KillDagProc(KillDagTask killDagTask) {
+ this.killDagTask = killDagTask;
+ }
+
+ protected CancelEntity initialize(DagManagementStateStore
dagManagementStateStore) throws IOException {
+ Dag<JobExecutionPlan> dagToCancel =
dagManagementStateStore.getDag(this.killDagTask.getDagId().toString());
+ List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = new
ArrayList<>(dagManagementStateStore.getJobs(this.killDagTask.getDagId().toString()));
+ return new CancelEntity(dagToCancel, dagNodesToCancel);
+ }
+
+ @Override
+ public Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore
dagManagementStateStore, CancelEntity cancelEntity) throws IOException {
+ if (cancelEntity.dagToCancel == null ||
cancelEntity.dagNodesToCancel.isEmpty()) {
+ log.warn("No dag with id " + this.killDagTask.getDagId() + " found to
kill");
+ return Optional.empty();
Review Comment:
why not return `Optional` from `initialize` and put this check over there?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.NewDagManager;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagProc} for launching {@link
org.apache.gobblin.service.modules.orchestration.task.DagTask}.
+ */
+@Slf4j
+@Alpha
+public class LaunchDagProc extends DagProc<Dag<JobExecutionPlan>, Void> {
+ private final LaunchDagTask launchDagTask;
+ NewDagManager newDagManager;
+
+ public LaunchDagProc(LaunchDagTask launchDagTask, NewDagManager
newDagManager) {
+ this.launchDagTask = launchDagTask;
+ this.newDagManager = newDagManager;
+ }
+
+ @Override
+ protected Dag<JobExecutionPlan> initialize(DagManagementStateStore
dagManagementStateStore) throws IOException {
+ return
dagManagementStateStore.getDag(this.launchDagTask.getDagId().toString());
+ }
+
+ @Override
+ protected Void act(DagManagementStateStore dagManagementStateStore,
Dag<JobExecutionPlan> dag) throws IOException {
+ if (dag == null) {
+ log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to
launch");
+ return null;
+ }
Review Comment:
prefer in `initialize`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.
+ */
+public interface DagManagementStateStore {
+ Dag<JobExecutionPlan> getDag(String dagId);
+ Dag<JobExecutionPlan> addDag(String dagId, Dag<JobExecutionPlan> dag);
+ boolean containsDag(String dagId);
+ Dag.DagNode<JobExecutionPlan> getDagNode(String dagNodeId);
+ Dag<JobExecutionPlan> getDagForJob(Dag.DagNode<JobExecutionPlan> dagNode);
+ List<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId) throws IOException;
+ List<Dag.DagNode<JobExecutionPlan>> getAllJobs() throws IOException;
+ boolean addFailedDag(String dagId);
+ boolean existsFailedDag(String dagId);
+ boolean addCleanUpDag(String dagId);
+ boolean checkCleanUpDag(String dagId);
+ void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+ void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+ void removeDagActionFromStore(DagActionStore.DagAction dagAction) throws
IOException;
Review Comment:
`releaseDagAction` / `deleteDagAction` / `removeDagAction`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Maps;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+
+/**
+ * An implementation of {@link DagProc} for killing {@link DagTask}.
+ */
+@Slf4j
+@Alpha
+public final class KillDagProc extends DagProc<KillDagProc.CancelEntity,
Optional<Dag<JobExecutionPlan>>> {
+
+ // should dag task be a part of dag proc?
+ private final KillDagTask killDagTask;
+
+ public KillDagProc(KillDagTask killDagTask) {
+ this.killDagTask = killDagTask;
+ }
+
+ protected CancelEntity initialize(DagManagementStateStore
dagManagementStateStore) throws IOException {
+ Dag<JobExecutionPlan> dagToCancel =
dagManagementStateStore.getDag(this.killDagTask.getDagId().toString());
+ List<Dag.DagNode<JobExecutionPlan>> dagNodesToCancel = new
ArrayList<>(dagManagementStateStore.getJobs(this.killDagTask.getDagId().toString()));
Review Comment:
why construct the `ArrayList`?... I thought the state store already returned
one
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.NewDagManager;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagProc} for launching {@link
org.apache.gobblin.service.modules.orchestration.task.DagTask}.
+ */
+@Slf4j
+@Alpha
+public class LaunchDagProc extends DagProc<Dag<JobExecutionPlan>, Void> {
+ private final LaunchDagTask launchDagTask;
+ NewDagManager newDagManager;
+
+ public LaunchDagProc(LaunchDagTask launchDagTask, NewDagManager
newDagManager) {
+ this.launchDagTask = launchDagTask;
+ this.newDagManager = newDagManager;
+ }
+
+ @Override
+ protected Dag<JobExecutionPlan> initialize(DagManagementStateStore
dagManagementStateStore) throws IOException {
+ return
dagManagementStateStore.getDag(this.launchDagTask.getDagId().toString());
+ }
+
+ @Override
+ protected Void act(DagManagementStateStore dagManagementStateStore,
Dag<JobExecutionPlan> dag) throws IOException {
+ if (dag == null) {
+ log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to
launch");
+ return null;
+ }
+ initializeDag(dag, dagManagementStateStore);
+ DagManagerUtils.submitPendingExecStatus(dag, this.eventSubmitter);
+ return null;
+ }
+
+ protected void initializeDag(Dag<JobExecutionPlan> dag,
DagManagementStateStore dagManagementStateStore)
+ throws IOException {
+ //Add Dag to the map of running dags
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
+ log.info("Initializing Dag {}",
DagManagerUtils.getFullyQualifiedDagName(dag));
+
+ //A flag to indicate if the flow is already running.
+ boolean isDagRunning = false;
+ //Are there any jobs already in the running state? This check is for Dags
already running
+ //before a leadership change occurs.
+ for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+ if (DagManagerUtils.getExecutionStatus(dagNode) ==
ExecutionStatus.RUNNING) {
+ dagManagementStateStore.addJobState(dagId, dagNode);
+ //Update the running jobs counter.
+
NewDagManager.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);
+ isDagRunning = true;
+ }
+ }
+
+ FlowId flowId = DagManagerUtils.getFlowId(dag);
+ NewDagManager.getDagManagerMetrics().registerFlowMetric(flowId, dag);
+
+ log.debug("Dag {} submitting jobs ready for execution.",
DagManagerUtils.getFullyQualifiedDagName(dag));
+ //Determine the next set of jobs to run and submit them for execution
+ Map<String, Set<Dag.DagNode<JobExecutionPlan>>> nextSubmitted =
submitNext(dagManagementStateStore, dagId);
+ for (Dag.DagNode<JobExecutionPlan> dagNode : nextSubmitted.get(dagId)) {
+ dagManagementStateStore.addJobState(dagId, dagNode);
+ }
+
+ // Set flow status to running
+ DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag,
TimingEvent.FlowTimings.FLOW_RUNNING);
+ NewDagManager.getDagManagerMetrics().conditionallyMarkFlowAsState(flowId,
DagManager.FlowState.RUNNING);
+
+ // Report the orchestration delay the first time the Dag is initialized.
Orchestration delay is defined as
+ // the time difference between the instant when a flow first transitions
to the running state and the instant
+ // when the flow is submitted to Gobblin service.
+ if (!isDagRunning) {
+ this.orchestrationDelay.set(System.currentTimeMillis() -
DagManagerUtils.getFlowExecId(dag));
+ }
+
+ log.info("Dag {} Initialization complete.",
DagManagerUtils.getFullyQualifiedDagName(dag));
+ }
+
+ /**
+ * Submit next set of Dag nodes in the Dag identified by the provided dagId
+ * @param dagId The dagId that should be processed.
+ * @return
+ * @throws IOException
+ */
+ // todo - convert to return set only
+ synchronized Map<String, Set<Dag.DagNode<JobExecutionPlan>>>
submitNext(DagManagementStateStore dagManagementStateStore,
+ String dagId) throws IOException {
+ Dag<JobExecutionPlan> dag = dagManagementStateStore.getDag(dagId);
+ Set<Dag.DagNode<JobExecutionPlan>> nextNodes =
DagManagerUtils.getNext(dag);
+ List<String> nextJobNames = new ArrayList<>();
+
+ //Submit jobs from the dag ready for execution.
+ for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
+ submitJob(dagManagementStateStore, dagNode);
+ nextJobNames.add(DagManagerUtils.getJobName(dagNode));
+ }
+
+ log.info("Submitting next nodes for dagId {}, where next jobs to be
submitted are {}", dagId, nextJobNames);
+ //Checkpoint the dag state
+ newDagManager.getDagStateStore().writeCheckpoint(dag);
+
+ Map<String, Set<Dag.DagNode<JobExecutionPlan>>> dagIdToNextJobs =
Maps.newHashMap();
+ dagIdToNextJobs.put(dagId, nextNodes);
+ return dagIdToNextJobs;
+ }
+
+ /**
+ * Submits a {@link JobSpec} to a {@link
org.apache.gobblin.runtime.api.SpecExecutor}.
+ */
+ private void submitJob(DagManagementStateStore dagManagementStateStore,
Dag.DagNode<JobExecutionPlan> dagNode) {
+ DagManagerUtils.incrementJobAttempt(dagNode);
+ JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
+ jobExecutionPlan.setExecutionStatus(ExecutionStatus.RUNNING);
+ JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+ // Run this spec on selected executor
+ SpecProducer<Spec> producer;
+ try {
+
this.newDagManager.getUserQuotaManager().checkQuota(Collections.singleton(dagNode));
+ producer = DagManagerUtils.getSpecProducer(dagNode);
+ TimingEvent jobOrchestrationTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED);
+
+ // Increment job count before submitting the job onto the spec producer,
in case that throws an exception.
+ // By this point the quota is allocated, so it's imperative to increment
as missing would introduce the potential to decrement below zero upon quota
release.
+ // Quota release is guaranteed, despite failure, because exception
handling within would mark the job FAILED.
+ // When the ensuing kafka message spurs DagManager processing, the quota
is released and the counts decremented
+ // Ensure that we do not double increment for flows that are retried
+ if (dagNode.getValue().getCurrentAttempts() == 1) {
+
NewDagManager.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);
+ }
+ // Submit the job to the SpecProducer, which in turn performs the actual
job submission to the SpecExecutor instance.
+ // The SpecProducer implementations submit the job to the underlying
executor and return when the submission is complete,
+ // either successfully or unsuccessfully. To catch any exceptions in the
job submission, the DagManagerThread
+ // blocks (by calling Future#get()) until the submission is completed.
+ Future<?> addSpecFuture = producer.addSpec(jobSpec);
+ dagNode.getValue().setJobFuture(Optional.of(addSpecFuture));
+ //Persist the dag
+
newDagManager.getDagStateStore().writeCheckpoint(dagManagementStateStore.getDag(DagManagerUtils.generateDagId(dagNode).toString()));
Review Comment:
again, why not keep so much state floating around, rather than consolidate
it into a singular `facade` we can easily replace--all-or-nothing?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * A {@link DagTask} responsible to handle kill tasks.
+ */
+@Alpha
+public class KillDagTask extends DagTask {
+ public KillDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseObtainedStatus) {
+ super(dagAction, leaseObtainedStatus);
+ }
Review Comment:
isn't there a lombok annotation that will synthesize this plus call super as
well?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.NewDagManager;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagProc} for launching {@link
org.apache.gobblin.service.modules.orchestration.task.DagTask}.
+ */
+@Slf4j
+@Alpha
+public class LaunchDagProc extends DagProc<Dag<JobExecutionPlan>, Void> {
+ private final LaunchDagTask launchDagTask;
+ NewDagManager newDagManager;
+
+ public LaunchDagProc(LaunchDagTask launchDagTask, NewDagManager
newDagManager) {
+ this.launchDagTask = launchDagTask;
+ this.newDagManager = newDagManager;
+ }
+
+ @Override
+ protected Dag<JobExecutionPlan> initialize(DagManagementStateStore
dagManagementStateStore) throws IOException {
+ return
dagManagementStateStore.getDag(this.launchDagTask.getDagId().toString());
+ }
+
+ @Override
+ protected Void act(DagManagementStateStore dagManagementStateStore,
Dag<JobExecutionPlan> dag) throws IOException {
+ if (dag == null) {
+ log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to
launch");
+ return null;
+ }
+ initializeDag(dag, dagManagementStateStore);
+ DagManagerUtils.submitPendingExecStatus(dag, this.eventSubmitter);
+ return null;
+ }
+
+ protected void initializeDag(Dag<JobExecutionPlan> dag,
DagManagementStateStore dagManagementStateStore)
+ throws IOException {
+ //Add Dag to the map of running dags
+ String dagId = DagManagerUtils.generateDagId(dag).toString();
+ log.info("Initializing Dag {}",
DagManagerUtils.getFullyQualifiedDagName(dag));
+
+ //A flag to indicate if the flow is already running.
+ boolean isDagRunning = false;
+ //Are there any jobs already in the running state? This check is for Dags
already running
+ //before a leadership change occurs.
+ for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+ if (DagManagerUtils.getExecutionStatus(dagNode) ==
ExecutionStatus.RUNNING) {
+ dagManagementStateStore.addJobState(dagId, dagNode);
+ //Update the running jobs counter.
+
NewDagManager.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);
Review Comment:
couldn't these *state-ful* metrics live encapsulated within the
`DagManagementStateStore`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecProducer;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.NewDagManager;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagProc} for launching {@link
org.apache.gobblin.service.modules.orchestration.task.DagTask}.
+ */
+@Slf4j
+@Alpha
+public class LaunchDagProc extends DagProc<Dag<JobExecutionPlan>, Void> {
+ private final LaunchDagTask launchDagTask;
+ NewDagManager newDagManager;
+
+ public LaunchDagProc(LaunchDagTask launchDagTask, NewDagManager
newDagManager) {
+ this.launchDagTask = launchDagTask;
+ this.newDagManager = newDagManager;
+ }
+
+ @Override
+ protected Dag<JobExecutionPlan> initialize(DagManagementStateStore
dagManagementStateStore) throws IOException {
+ return
dagManagementStateStore.getDag(this.launchDagTask.getDagId().toString());
Review Comment:
again, it would look much clearer to have the ctor take just `dagId`, rather
than a specific `DagTask` type
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task in a Dag.
+ * Upon completion of the {@link DagProc#process(DagManagementStateStore)} it
will mark the lease
+ * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter}
as complete
+ */
+
+@Alpha
+public abstract class DagTask {
+ @Getter public DagActionStore.DagAction dagAction;
+ private MultiActiveLeaseArbiter.LeaseAttemptStatus leaseObtainedStatus;
+ @Getter DagManager.DagId dagId;
+
+ public DagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseObtainedStatus) {
+ this.dagAction = dagAction;
+ this.leaseObtainedStatus = leaseObtainedStatus;
+ this.dagId = DagManagerUtils.generateDagId(dagAction.getFlowGroup(),
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+ }
+
+ public abstract DagProc host(DagTaskVisitor<DagProc> visitor);
Review Comment:
this should be:
```
<T>
public abstract T host(DagTaskVisitor<T> visitor);
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task in a Dag.
+ * Upon completion of the {@link DagProc#process(DagManagementStateStore)} it
will mark the lease
+ * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter}
as complete
+ */
+
+@Alpha
+public abstract class DagTask {
+ @Getter public DagActionStore.DagAction dagAction;
Review Comment:
I'm not sure who should be accessing this... who do you have in mind?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Singleton;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+/**
+ * Holds a stream of {@link DagTask}s that {@link DagProcessingEngine} would
pull from for processing.
+ * Implements {@link Iterator} to provide {@link DagTask}s as soon as it's
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagTaskStream implements Iterator<DagTask>{
+ private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+
+ @Override
+ public boolean hasNext() {
+ return !this.dagActionQueue.isEmpty();
+ }
+
+ @Override
+ public DagTask next() {
+ try {
+ DagActionStore.DagAction dagAction = this.dagActionQueue.take();
//`take` blocks till element is not available
+ // todo reconsider the use of MultiActiveLeaseArbiter
+ //MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = new
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction);
+ // todo - uncomment after flow trigger handler provides such an api
+ //Properties jobProps = getJobProperties(dagAction);
+ //flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis());
+ //if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ // can it return null? is this iterator allowed to return null?
+ return createDagTask(dagAction, new
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction,
System.currentTimeMillis()));
Review Comment:
I understand punting on the actual lease arbitration, but for clarity, I'd
suggest modeling that not all leases are necessarily obtained. e.g. w/ a
private method `acquireLease` that returns either the lease status base type or
an `Optional<LeaseObtainedStatus>`. then have this method `createDagTask` only
if the lease was obtained.
Issue Time Tracking
-------------------
Worklog Id: (was: 904486)
Time Spent: 17h 50m (was: 17h 40m)
> Refactor code to move current in-memory references to new design for REST
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1910
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Meeth Gala
> Priority: Major
> Time Spent: 17h 50m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)