umustafi commented on code in PR #3825:
URL: https://github.com/apache/gobblin/pull/3825#discussion_r1432032055


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReloadDagProc.java:
##########


Review Comment:
   see comment on `ReloadDagTask`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
+import org.apache.gobblin.service.monitoring.event.JobStatusEvent;
+
+
+public interface DagManagement {
+  DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap);
+  void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+  void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+  /**
+   * defines what to do when a job (dag node) finishes
+   * @param dagNode dag node that finished
+   * @return next set of DagNodes to run
+   * @throws IOException
+   */
+  Map<String, Set<Dag.DagNode<JobExecutionPlan>>> 
onJobFinish(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
+  void removeDagActionFromStore(DagActionStore.DagAction dagAction) throws 
IOException;
+  void handleJobStatusEvent(JobStatusEvent jobStatusEvent);
+  void handleKillFlowEvent(KillFlowEvent killFlowEvent);
+  void handleResumeFlowEvent(ResumeFlowEvent resumeFlowEvent) throws 
IOException;

Review Comment:
   these are the core functionality of `dagManagement` (handling new dag 
action, enforce SLA/resume/launch/kill ... and add to stream). the other 
methods below should be in `dagManagementStateStore` and `dagProc` interact 
with `dagManagementStateStore`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link 
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ * and allows add/delete and other functions.

Review Comment:
   add some detail in java doc: load/update/save any state relating to dag. 
used by `dagProcs`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/RetryDagTask.java:
##########


Review Comment:
   We can remove this task type as well as Proc because we load up tasks as 
needed when there is a `dagAction` pending. Steady state active host (or a 
newly starting up one) will load dag from dag state store after getting lease 
for the dagTask (that was added to the dagTaskStream).



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManager.java:
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import org.apache.gobblin.service.monitoring.KillFlowEvent;
+import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
+import org.apache.gobblin.service.monitoring.event.JobStatusEvent;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.*;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) manages {@link Dag}s through {@link DagManagementStateStore}.
+ * b) subscribes to {@link JobStatusEvent} sent by {@link 
KafkaJobStatusMonitor}
+ * c) spawns a {@link KillDagThread} that enforces run time and job start time 
deadlines.
+ * d) spawns a {@link DagManager.FailedDagRetentionThread} that cleans failed 
dags.
+ * e) load {@link Dag}s on service-start / set-active.
+ */
+@Slf4j
+public class NewDagManager implements DagManagement {

Review Comment:
   if these methods already exist elsewhere we should remove or see if we can 
abstract away unnecessary methods and functionality. We can clean up after 
ironing out interface for everything else. For now let's keep b). 
   Instead of c) for start/kill SLA enforcement we need to send some action to 
revisit and check if started/completed but all hosts need to know of it so need 
to durably store it somewhere (timer). Then only if SLA breached we add a 
dagAction to kill. `launchDagAction` -> durably store`setStartSlaTimer` AND 
`setCompletionSlaTimer` somewhere and set the timer for urself (on restart we 
can load these in or have another stream for it). When all host timers go off, 
do lease arbitration to enforce and emit a 
`cancelBCStartSlaBreached/CompletionSlaBreached` action



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcFactory;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagProc} for launching {@link 
org.apache.gobblin.service.modules.orchestration.task.DagTask}.
+ */
+@Slf4j
+@Alpha
+public class LaunchDagProc extends DagProc<LaunchDagTask> {
+  private final LaunchDagTask launchDagTask;
+  private Dag<JobExecutionPlan> dag;
+
+  public LaunchDagProc(LaunchDagTask launchDagTask, DagProcFactory 
dagProcFactory) {
+    super(dagProcFactory);
+    this.launchDagTask = launchDagTask;
+  }
+
+  @Override
+  protected void initialize() throws IOException {
+    this.dag = 
this.dagManager.getDag(this.launchDagTask.getDagId().toString());
+    if (this.dag == null) {
+      log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to 
launch");
+      return;
+    }
+    initializeDag(this.dag);
+  }
+
+  @Override
+  protected void act() throws IOException {
+    if (this.dag == null) {
+      log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to 
launch");
+      return;
+    }
+    this.dagStateStore.writeCheckpoint(this.dag);
+    for (Dag.DagNode<JobExecutionPlan> dagNode : this.dag.getStartNodes()) {
+      this.dagProcFactory.dagProcessingEngine.addAdvanceDagAction(dagNode);

Review Comment:
   overall flow `dagAction` stored in `dagActionStore` -> 
`dagActionStoreMonitor` receives action and adds task to `dagTaskStream`-> 
`dagProcessingEngine` iterates through tasks and "acts" on each one to do work 
via `dagProc`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagProcFactory;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagProc} for launching {@link 
org.apache.gobblin.service.modules.orchestration.task.DagTask}.
+ */
+@Slf4j
+@Alpha
+public class LaunchDagProc extends DagProc<LaunchDagTask> {
+  private final LaunchDagTask launchDagTask;
+  private Dag<JobExecutionPlan> dag;
+
+  public LaunchDagProc(LaunchDagTask launchDagTask, DagProcFactory 
dagProcFactory) {
+    super(dagProcFactory);
+    this.launchDagTask = launchDagTask;
+  }
+
+  @Override
+  protected void initialize() throws IOException {
+    this.dag = 
this.dagManager.getDag(this.launchDagTask.getDagId().toString());
+    if (this.dag == null) {
+      log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to 
launch");
+      return;
+    }
+    initializeDag(this.dag);
+  }
+
+  @Override
+  protected void act() throws IOException {
+    if (this.dag == null) {
+      log.warn("No dag with id " + this.launchDagTask.getDagId() + " found to 
launch");
+      return;
+    }
+    this.dagStateStore.writeCheckpoint(this.dag);
+    for (Dag.DagNode<JobExecutionPlan> dagNode : this.dag.getStartNodes()) {

Review Comment:
   Two options for how to deal with multi-hop flow
   1) the way you have here 
   - update status for dagStateStore
   - then create separate dagActions for each dagNode
   2) launch 1 dagNode at a time, upon completion check for more nodes to 
launch and add them one by one committing each time



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to