[
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=883014&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-883014
]
ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 02/Oct/23 23:09
Start Date: 02/Oct/23 23:09
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3776:
URL: https://github.com/apache/gobblin/pull/3776#discussion_r1343192925
##########
gobblin-rest-service/gobblin-rest-api/src/main/snapshot/org.apache.gobblin.rest.jobExecutions.snapshot.json:
##########
Review Comment:
Is this meant to be a part of ur change? was this done or included by
mistake?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ */
+public interface DagManagementStateStore {
+
+ public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+ public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan>
dagNode);
+
+ public boolean hasRunningJobs(String dagId);
+
+ public void removeDagActionFromStore(DagManager.DagId dagId,
DagActionStore.FlowActionType flowActionType) throws IOException;
+
+ public void addDagSLA(String dagId, Long flowSla);
+
+ public Long getDagSLA(String dagId);
Review Comment:
are these dagStart or dagKill Deadlines? We have both configurable so may
want to create separate functions for each.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc}
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+ public static final String NUM_THREADS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+ public static final String JOB_STATUS_POLLING_INTERVAL_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+ public static final String MAX_RETRY_ATTEMPTS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+ public static final String RETRY_DELAY_MS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+ private static final Integer DEFAULT_NUM_THREADS = 3;
+ private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+ private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+ private static final long DEFAULT_RETRY_DELAY_MS = 1000;
Review Comment:
`millis` to differentiate from `micros`, also add the time unit to polling
interval
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java:
##########
@@ -171,6 +176,12 @@ public void configure(Binder binder) {
if (serviceConfig.isMultiActiveSchedulerEnabled()) {
binder.bind(MultiActiveLeaseArbiter.class).to(MysqlMultiActiveLeaseArbiter.class);
binder.bind(FlowTriggerHandler.class);
+ if(serviceConfig.isDagProcessingEngineEnabled()) {
+
binder.bind(DagManagementStateStore.class).to(InMemoryDagManagementStateStore.class);
+ binder.bind(DagProcFactory.class).in(Singleton.class);
+ binder.bind(DagProcessingEngine.class).in(Singleton.class);
+ binder.bind(DagTaskStream.class).in(Singleton.class);
Review Comment:
I believe you need to create an `OptionalBinder` like I've done above
`OptionalBinder.newOptionalBinder(binder,MultiActiveLeaseArbiter.class);` as
each of these classes are instantiated only in certain cases.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+ @Getter
+ private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
+
+
+ @NotNull
+ @Override
+ public Iterator<DagTask> iterator() {
+ return new Iterator<DagTask>() {
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public DagTask next() {
+
+ DagTask dagTask = null;
+ while(true) {
+ DagActionStore.DagAction dagAction = take();
+ Properties jobProps = getJobProperties(dagAction);
+ try {
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis());
+ if(leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ dagTask = createDagTask(dagAction,
+ (MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus);
+ }
+ if (dagTask != null) {
+ break; // Exit the loop when dagTask is non-null
+ }
Review Comment:
let's move this check to above conditional
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+ @Getter
+ private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
+
+
+ @NotNull
+ @Override
+ public Iterator<DagTask> iterator() {
+ return new Iterator<DagTask>() {
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public DagTask next() {
+
+ DagTask dagTask = null;
+ while(true) {
+ DagActionStore.DagAction dagAction = take();
+ Properties jobProps = getJobProperties(dagAction);
+ try {
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis());
+ if(leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ dagTask = createDagTask(dagAction,
+ (MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus);
+ }
+ if (dagTask != null) {
+ break; // Exit the loop when dagTask is non-null
+ }
+ } catch (IOException e) {
+ //TODO: need to handle exceptions gracefully
+ throw new RuntimeException(e);
+ }
+ }
+ return dagTask;
+ }
+ };
+ }
+
+ private boolean add(DagActionStore.DagAction dagAction) {
+ return this.dagActionQueue.offer(dagAction);
+ }
+
+ private DagActionStore.DagAction take() {
+ return this.dagActionQueue.poll();
+ }
+
+ private DagTask createDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ DagActionStore.FlowActionType flowActionType =
dagAction.getFlowActionType();
+ switch (flowActionType) {
+ case KILL:
+ return new KillDagTask(dagAction, leaseObtainedStatus);
+ case RESUME:
+ case LAUNCH:
+ case ADVANCE:
+ default:
+ log.warn("It should not reach here. Yet to provide implementation.");
+ return null;
+ }
+ }
+
+ protected void complete(DagTask dagTask) throws IOException {
+ dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+ }
+
+ @Override
+ public void launchFlow(String flowGroup, String flowName, long
eventTimestamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently launch flow is not
supported.");
+ }
+
+ @Override
+ public void resumeFlow(DagActionStore.DagAction killAction, long
eventTimestamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently resume flow is not
supported.");
+ }
+
+ @Override
+ public void killFlow(DagActionStore.DagAction killAction, long
eventTimestamp) throws IOException {
+ if(!add(killAction)) {
+ throw new IOException("Could not add kill dag action: " + killAction +
" to the queue.");
+ }
+ }
+ /**
+ * Check if the SLA is configured for the flow this job belongs to.
+ * If it is, this method will enforce on cancelling the job when SLA is
reached.
+ * It will return true and cancellation would be initiated via {@link
org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc}
+ * by creating {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction} on the flowtype:
{@link org.apache.gobblin.runtime.api.DagActionStore.FlowActionType#CANCEL}
+ * The Flow SLA will be set when the {@link Dag} is launched either via
Scheduler or REST client
+ * @param node dag node of the job
+ * @return true if the job reached sla needs to be cancelled
+ * @throws ExecutionException exception
+ * @throws InterruptedException exception
+ */
+
Review Comment:
extra newline. is it launched any other way?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+ @Getter
+ private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
+
+
+ @NotNull
+ @Override
+ public Iterator<DagTask> iterator() {
+ return new Iterator<DagTask>() {
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public DagTask next() {
+
+ DagTask dagTask = null;
+ while(true) {
+ DagActionStore.DagAction dagAction = take();
+ Properties jobProps = getJobProperties(dagAction);
+ try {
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis());
+ if(leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ dagTask = createDagTask(dagAction,
+ (MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus);
+ }
+ if (dagTask != null) {
+ break; // Exit the loop when dagTask is non-null
+ }
+ } catch (IOException e) {
+ //TODO: need to handle exceptions gracefully
+ throw new RuntimeException(e);
+ }
+ }
+ return dagTask;
+ }
+ };
+ }
+
+ private boolean add(DagActionStore.DagAction dagAction) {
+ return this.dagActionQueue.offer(dagAction);
+ }
+
+ private DagActionStore.DagAction take() {
+ return this.dagActionQueue.poll();
+ }
+
+ private DagTask createDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ DagActionStore.FlowActionType flowActionType =
dagAction.getFlowActionType();
+ switch (flowActionType) {
+ case KILL:
+ return new KillDagTask(dagAction, leaseObtainedStatus);
+ case RESUME:
+ case LAUNCH:
+ case ADVANCE:
+ default:
+ log.warn("It should not reach here. Yet to provide implementation.");
Review Comment:
let's update this message to be more specific. `Received flowActionType that
is not handled by createDagTask`. Instead of return null we should either
return an `optional` or throw an exception and catch the exception/check for
`optional.isPresent`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Synchronized;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate
independently.
+ */
+@Getter(onMethod_={@Synchronized})
+@Alpha
+public class InMemoryDagManagementStateStore implements
DagManagementStateStore {
+ private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>>
jobToDag = new HashMap<>();
+ private final Map<String, Dag<JobExecutionPlan>> dagIdToDags = new
HashMap<>();
+ private final Set<String> failedDagIds = new HashSet<>();
+ private final Map<String, Dag<JobExecutionPlan>> dagIdToResumingDags = new
HashMap<>();
+ // dagToJobs holds a map of dagId to running jobs of that dag
+ final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new
HashMap<>();
Review Comment:
shall we rename to `dagIdToRunningJob`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Synchronized;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate
independently.
+ */
+@Getter(onMethod_={@Synchronized})
+@Alpha
+public class InMemoryDagManagementStateStore implements
DagManagementStateStore {
+ private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>>
jobToDag = new HashMap<>();
+ private final Map<String, Dag<JobExecutionPlan>> dagIdToDags = new
HashMap<>();
+ private final Set<String> failedDagIds = new HashSet<>();
+ private final Map<String, Dag<JobExecutionPlan>> dagIdToResumingDags = new
HashMap<>();
+ // dagToJobs holds a map of dagId to running jobs of that dag
+ final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new
HashMap<>();
+ final Map<String, Long> dagToSLA = new HashMap<>();
+ private final Set<String> dagIdstoClean = new HashSet<>();
+ private Optional<DagActionStore> dagActionStore;
+
+ @Override
+ public synchronized void deleteJobState(String dagId,
Dag.DagNode<JobExecutionPlan> dagNode) {
+ this.jobToDag.remove(dagNode);
+ this.dagToJobs.get(dagId).remove(dagNode);
+ this.dagToSLA.remove(dagId);
+ }
+
+
+ @Override
+ public synchronized void addJobState(String dagId,
Dag.DagNode<JobExecutionPlan> dagNode) {
+ Dag<JobExecutionPlan> dag = this.dagIdToDags.get(dagId);
+ this.jobToDag.put(dagNode, dag);
+ if (this.dagToJobs.containsKey(dagId)) {
+ this.dagToJobs.get(dagId).add(dagNode);
+ } else {
+ LinkedList<Dag.DagNode<JobExecutionPlan>> dagNodeList =
Lists.newLinkedList();
+ dagNodeList.add(dagNode);
+ this.dagToJobs.put(dagId, dagNodeList);
+ }
+ }
+
+ @Override
+ public synchronized boolean hasRunningJobs(String dagId) {
+ List<Dag.DagNode<JobExecutionPlan>> dagNodes = this.dagToJobs.get(dagId);
+ return dagNodes != null && !dagNodes.isEmpty();
+ }
+
+ @Override
+ public synchronized void removeDagActionFromStore(DagManager.DagId dagId,
DagActionStore.FlowActionType flowActionType) throws IOException {
+ if (this.dagActionStore.isPresent()) {
+ this.dagActionStore.get().deleteDagAction(
+ new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName,
dagId.flowExecutionId, flowActionType));
+ }
+ }
+
+ @Override
+ public void addDagSLA(String dagId, Long flowSla) {
+ this.dagToSLA.putIfAbsent(dagId, flowSla);
+ }
+
+ @Override
+ public Long getDagSLA(String dagId) {
+ if(this.dagToSLA.containsKey(dagId)) {
+ return this.dagToSLA.get(dagId);
+ }
+ return null;
+ }
+
+ @Override
+ public Dag<JobExecutionPlan> getDag(String dagId) {
+ if(this.dagIdToDags.containsKey(dagId)) {
+ return this.dagIdToDags.get(dagId);
+ }
+ return null;
Review Comment:
we should return `Optional` not null, hard to remember to do null check
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+ @Getter
+ private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
+
+
+ @NotNull
+ @Override
+ public Iterator<DagTask> iterator() {
+ return new Iterator<DagTask>() {
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public DagTask next() {
+
+ DagTask dagTask = null;
+ while(true) {
+ DagActionStore.DagAction dagAction = take();
+ Properties jobProps = getJobProperties(dagAction);
+ try {
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis());
+ if(leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ dagTask = createDagTask(dagAction,
+ (MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus);
+ }
+ if (dagTask != null) {
+ break; // Exit the loop when dagTask is non-null
+ }
+ } catch (IOException e) {
+ //TODO: need to handle exceptions gracefully
+ throw new RuntimeException(e);
+ }
+ }
+ return dagTask;
+ }
+ };
+ }
+
+ private boolean add(DagActionStore.DagAction dagAction) {
+ return this.dagActionQueue.offer(dagAction);
+ }
+
+ private DagActionStore.DagAction take() {
+ return this.dagActionQueue.poll();
+ }
+
+ private DagTask createDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ DagActionStore.FlowActionType flowActionType =
dagAction.getFlowActionType();
+ switch (flowActionType) {
+ case KILL:
+ return new KillDagTask(dagAction, leaseObtainedStatus);
+ case RESUME:
+ case LAUNCH:
+ case ADVANCE:
+ default:
+ log.warn("It should not reach here. Yet to provide implementation.");
+ return null;
+ }
+ }
+
+ protected void complete(DagTask dagTask) throws IOException {
+ dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+ }
+
+ @Override
+ public void launchFlow(String flowGroup, String flowName, long
eventTimestamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently launch flow is not
supported.");
+ }
+
+ @Override
+ public void resumeFlow(DagActionStore.DagAction killAction, long
eventTimestamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently resume flow is not
supported.");
+ }
+
+ @Override
+ public void killFlow(DagActionStore.DagAction killAction, long
eventTimestamp) throws IOException {
+ if(!add(killAction)) {
+ throw new IOException("Could not add kill dag action: " + killAction +
" to the queue.");
+ }
+ }
+ /**
+ * Check if the SLA is configured for the flow this job belongs to.
+ * If it is, this method will enforce on cancelling the job when SLA is
reached.
+ * It will return true and cancellation would be initiated via {@link
org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc}
+ * by creating {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction} on the flowtype:
{@link org.apache.gobblin.runtime.api.DagActionStore.FlowActionType#CANCEL}
+ * The Flow SLA will be set when the {@link Dag} is launched either via
Scheduler or REST client
+ * @param node dag node of the job
+ * @return true if the job reached sla needs to be cancelled
+ * @throws ExecutionException exception
+ * @throws InterruptedException exception
+ */
+
+ @Override
+ public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan>
node) throws ExecutionException, InterruptedException {
+ //TODO: need to fetch timestamps from database when multi-active is enabled
+ long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+ long currentTime = System.currentTimeMillis();
+ String dagId = DagManagerUtils.generateDagId(node).toString();
+
+ long flowSla = this.dagManagementStateStore.getDagSLA(dagId);
+
+ if (currentTime > flowStartTime + flowSla) {
+ log.info("Flow {} exceeded the SLA of {} ms. Enforce cancellation of the
job {} ...",
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
flowSla,
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+ dagManagerMetrics.incrementExecutorSlaExceeded(node);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Enforce cancel on the job if the job has been "orphaned". A job is
orphaned if has been in ORCHESTRATED
+ * {@link ExecutionStatus} for some specific amount of time.
+ * @param node {@link Dag.DagNode} representing the job
+ * @param jobStatus current {@link JobStatus} of the job
+ * @return true if the total time that the job remains in the ORCHESTRATED
state exceeds
+ * {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
+ */
+
+ @Override
+ public boolean enforceJobStartDeadline(Dag.DagNode<JobExecutionPlan> node,
JobStatus jobStatus) throws ExecutionException, InterruptedException {
+ if (jobStatus == null) {
+ return false;
+ }
+ ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
+ //TODO: timestamps needs to be fetched from database instead of using
System.currentTimeMillis()
+ long timeOutForJobStart = DagManagerUtils.getJobStartSla(node,
System.currentTimeMillis());
+ long jobOrchestratedTime = jobStatus.getOrchestratedTime();
+ if (executionStatus == ORCHESTRATED && System.currentTimeMillis() -
jobOrchestratedTime > timeOutForJobStart) {
+ log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Enforce
cancellation of the job ...",
+ DagManagerUtils.getJobName(node),
+ DagManagerUtils.getFullyQualifiedDagName(node),
+ timeOutForJobStart);
+ dagManagerMetrics.incrementCountsStartSlaExceeded(node);
Review Comment:
this doesn't actually kill or restart the job it looks like right, we just
return a boolean. Let's rename the function then to say `is...DeadlineExceeded`
for this function and above one. Let's have a separate function that enforces
it and does the action.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/CleanUpDagProc.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+
+
+/**
+ * An implementation of {@link DagProc} that is responsible for cleaning up
{@link Dag} that has reached an end state
+ * likewise: FAILED, COMPLETE or CANCELED
Review Comment:
nit: `ie:` or `including:`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -136,14 +144,22 @@ protected void processMessage(DecodeableKafkaRecord
message) {
// We only expect INSERT and DELETE operations done to this table. INSERTs
correspond to any type of
// {@link DagActionStore.FlowActionType} flow requests that have to be
processed. DELETEs require no action.
try {
+
if (operation.equals("INSERT")) {
if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
log.info("Received insert dag action and about to send resume flow
request");
dagManager.handleResumeFlowRequest(flowGroup,
flowName,Long.parseLong(flowExecutionId));
+ //TODO: add a flag for if condition only if multi-active is enabled
this.resumesInvoked.mark();
} else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) {
log.info("Received insert dag action and about to send kill flow
request");
dagManager.handleKillFlowRequest(flowGroup, flowName,
Long.parseLong(flowExecutionId));
Review Comment:
the conditional above should be checked first and this should be the else
right?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state,
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+
+ abstract protected S initialize(DagManagementStateStore
dagManagementStateStore) throws MaybeRetryableException, IOException;
+ abstract protected R act(S state, DagManagementStateStore
dagManagementStateStore) throws MaybeRetryableException, Exception;
+ abstract protected void sendNotification(R result, EventSubmitter
eventSubmitter) throws MaybeRetryableException, IOException;
+
+ public final void process(DagManagementStateStore dagManagementStateStore,
EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) {
+ try {
+ S state = this.initializeWithRetries(dagManagementStateStore,
maxRetryCount, delayRetryMillis);
+ R result = this.actWithRetries(state, dagManagementStateStore,
maxRetryCount, delayRetryMillis); // may be pass state store too here
+ this.sendNotificationWithRetries(result, eventSubmitter, maxRetryCount,
delayRetryMillis);
+ log.info("Successfully processed Dag Request");
+ } catch (Exception ex) {
+ throw new RuntimeException("Cannot process Dag Request: ", ex);
+ }
+ }
+
+ protected final S initializeWithRetries(DagManagementStateStore
dagManagementStateStore, int maxRetryCount, long delayRetryMillis) throws
IOException {
+ for (int retryCount = 0; retryCount < maxRetryCount; retryCount++) {
+ try {
+ return this.initialize(dagManagementStateStore);
+ } catch (MaybeRetryableException e) {
+ if (retryCount < maxRetryCount - 1) { // Don't wait before the last
retry
+ waitBeforeRetry(delayRetryMillis);
+ }
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ throw new RuntimeException("Max retry attempts reached. Cannot initialize
Dag");
+ }
+
+ protected final R actWithRetries(S state, DagManagementStateStore
dagManagementStateStore, int maxRetryCount, long delayRetryMillis) {
Review Comment:
should we have a generic method that deals with retries and takes a function
as parameter? Seems like you are repeating code between these two functions and
want to handle them the same way
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.processor.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagManager} would pull from
to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} defines the rules
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if
available to {@link DagManager}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterator<Optional<DagTask>>,
DagManagement {
+
+ @Getter
+ private final BlockingDeque<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingDeque<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
+
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+
+ @Override
+ public Optional<DagTask> next() {
+
+ DagActionStore.DagAction dagAction = dagActionQueue.peek();
+ try {
+ Preconditions.checkArgument(dagAction != null, "No Dag Action found in
the queue");
+ Properties jobProps = getJobProperties(dagAction);
+ Optional<MultiActiveLeaseArbiter.LeaseObtainedStatus>
leaseObtainedStatus =
+ flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis()).toJavaUtil();
+ if(leaseObtainedStatus.isPresent()) {
+ DagTask dagTask = createDagTask(dagAction, leaseObtainedStatus.get());
+ return Optional.of(dagTask);
+ }
+ } catch (Exception ex) {
+ //TODO: need to handle exceptions gracefully
+ throw new RuntimeException(ex);
+ }
+ return Optional.empty();
+ }
+
+ public boolean add(DagActionStore.DagAction dagAction) throws IOException {
+ return this.dagActionQueue.offer(dagAction);
+ }
+
+ public DagActionStore.DagAction take() {
+ return this.dagActionQueue.poll();
+ }
+
+ public DagTask createDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ DagActionStore.FlowActionType flowActionType =
dagAction.getFlowActionType();
+ switch (flowActionType) {
+ case KILL:
+ return new KillDagTask(dagAction, leaseObtainedStatus);
+ case RESUME:
+ case LAUNCH:
+ case ADVANCE:
+ default:
+ log.warn("It should not reach here. Yet to provide implementation.");
+ return null;
+ }
+ }
+
+ @Override
+ public void launchFlow(String flowGroup, String flowName, long
triggerTimeStamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently launch flow is not
supported.");
+ }
+
+ @Override
+ public void resumeFlow(String flowGroup, String flowName, String
flowExecutionId, long triggerTimeStamp) throws IOException,
InterruptedException {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently launch flow is not
supported.");
+ }
+
+ @Override
+ public void killFlow(String flowGroup, String flowName, String
flowExecutionId, long produceTimestamp) throws IOException {
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(flowGroup,
flowName, flowExecutionId);
+
if(!this.dagManagementStateStore.getDagIdToDags().containsKey(dagId.toString()))
{
+ log.info("Invalid dag since not present in map. Hence cannot cancel it");
+ return;
+ }
+ DagActionStore.DagAction killAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.KILL);
+ if(!add(killAction)) {
+ throw new IOException("Could not add kill dag action: " + killAction +
" to the queue.");
+ }
+ }
+ /**
+ * Check if the SLA is configured for the flow this job belongs to.
+ * If it is, this method will try to cancel the job when SLA is reached.
+ *
+ * @param node dag node of the job
+ * @return true if the job is killed because it reached sla
+ * @throws ExecutionException exception
+ * @throws InterruptedException exception
+ */
+
+ @Override
+ public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan>
node) throws ExecutionException, InterruptedException {
+ //TODO: need to distribute the responsibility outside of this class
+ long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+ long currentTime = System.currentTimeMillis();
+ String dagId = DagManagerUtils.generateDagId(node).toString();
+
+ long flowSla;
+ if (this.dagManagementStateStore.getDagToSLA().containsKey(dagId)) {
+ flowSla = this.dagManagementStateStore.getDagToSLA().get(dagId);
+ } else {
+ try {
+ flowSla = DagManagerUtils.getFlowSLA(node);
+ } catch (ConfigException e) {
+ log.warn("Flow SLA for flowGroup: {}, flowName: {} is given in invalid
format, using default SLA of {}",
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY),
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
+ DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS);
+ flowSla = DagManagerUtils.DEFAULT_FLOW_SLA_MILLIS;
+ }
+ this.dagManagementStateStore.getDagToSLA().put(dagId, flowSla);
+ }
+
+ if (currentTime > flowStartTime + flowSla) {
+ log.info("Flow {} exceeded the SLA of {} ms. Killing the job {} now...",
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
flowSla,
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+ dagManagerMetrics.incrementExecutorSlaExceeded(node);
+ KillDagProc.killDagNode(node);
+
+
this.dagManagementStateStore.getDagIdToDags().get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
+
this.dagManagementStateStore.getDagIdToDags().get(dagId).setMessage("Flow
killed due to exceeding SLA of " + flowSla + " ms");
+
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Cancel the job if the job has been "orphaned". A job is orphaned if has
been in ORCHESTRATED
+ * {@link ExecutionStatus} for some specific amount of time.
+ * @param node {@link Dag.DagNode} representing the job
+ * @param jobStatus current {@link JobStatus} of the job
+ * @return true if the total time that the job remains in the ORCHESTRATED
state exceeds
+ * {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
+ */
+
+ @Override
+ public boolean enforceJobStartDeadline(Dag.DagNode<JobExecutionPlan> node,
JobStatus jobStatus) throws ExecutionException, InterruptedException {
+ //TODO: need to distribute the responsibility outside of this class
+ if (jobStatus == null) {
+ return false;
+ }
+ ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
+ //TODO: initialize default job sla in millis via configs
+ long timeOutForJobStart = DagManagerUtils.getJobStartSla(node,
System.currentTimeMillis());
+ long jobOrchestratedTime = jobStatus.getOrchestratedTime();
+ if (executionStatus == ORCHESTRATED && System.currentTimeMillis() -
jobOrchestratedTime > timeOutForJobStart) {
+ log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing
the job now...",
+ DagManagerUtils.getJobName(node),
+ DagManagerUtils.getFullyQualifiedDagName(node),
+ timeOutForJobStart);
+ dagManagerMetrics.incrementCountsStartSlaExceeded(node);
+ KillDagProc.killDagNode(node);
+
+ String dagId = DagManagerUtils.generateDagId(node).toString();
+
dagManagementStateStore.getDagIdToDags().get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
+ dagManagementStateStore.getDagIdToDags().get(dagId).setMessage("Flow
killed because no update received for " + timeOutForJobStart + " ms after
orchestration");
+ return true;
+ } else {
+ return false;
+ }
+
+ }
+
+ /**
+ * Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
+ */
+
+ protected JobStatus retrieveJobStatus(Dag.DagNode<JobExecutionPlan> dagNode)
{
Review Comment:
plus 1 we can move these to the `DagManagementStateStore`, we can call those
methods if needed
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
+ */
+public interface DagManagementStateStore {
+
+ public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode);
+
+ public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan>
dagNode);
+
+ public boolean hasRunningJobs(String dagId);
+
+ public void removeDagActionFromStore(DagManager.DagId dagId,
DagActionStore.FlowActionType flowActionType) throws IOException;
+
+ public void addDagSLA(String dagId, Long flowSla);
+
+ public Long getDagSLA(String dagId);
+
+ public Dag<JobExecutionPlan> getDag(String dagId);
+
+ public LinkedList<Dag.DagNode<JobExecutionPlan>> getJobs(String dagId)
throws IOException;
+
+ public boolean addFailedDagId(String dagId);
Review Comment:
does this add to failedDagId store?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+ @Getter
+ private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
+
+
+ @NotNull
+ @Override
+ public Iterator<DagTask> iterator() {
+ return new Iterator<DagTask>() {
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public DagTask next() {
+
+ DagTask dagTask = null;
+ while(true) {
+ DagActionStore.DagAction dagAction = take();
+ Properties jobProps = getJobProperties(dagAction);
+ try {
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis());
+ if(leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ dagTask = createDagTask(dagAction,
+ (MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus);
+ }
+ if (dagTask != null) {
+ break; // Exit the loop when dagTask is non-null
+ }
+ } catch (IOException e) {
+ //TODO: need to handle exceptions gracefully
+ throw new RuntimeException(e);
+ }
+ }
+ return dagTask;
+ }
+ };
+ }
+
+ private boolean add(DagActionStore.DagAction dagAction) {
+ return this.dagActionQueue.offer(dagAction);
+ }
+
+ private DagActionStore.DagAction take() {
+ return this.dagActionQueue.poll();
Review Comment:
any exceptions possible to result in this method and one above?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+ @Getter
+ private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
Review Comment:
`final` as well
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc}
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+ public static final String NUM_THREADS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+ public static final String JOB_STATUS_POLLING_INTERVAL_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+ public static final String MAX_RETRY_ATTEMPTS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+ public static final String RETRY_DELAY_MS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
+
+
+ private static final Integer DEFAULT_NUM_THREADS = 3;
+ private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
+ private static final Integer DEFAULT_MAX_RETRY_ATTEMPTS = 3;
+ private static final long DEFAULT_RETRY_DELAY_MS = 1000;
Review Comment:
these can also be declared here or in configuration files above as is normal
with other configurations but I'm less particular about it
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would
pull from to process, as it is ready for more work.
Review Comment:
this sentence is a bit hard to read `Maintains a stream of ... that ... it
polls when ready for more work to process`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+ @Getter
+ private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
+
+
+ @NotNull
+ @Override
+ public Iterator<DagTask> iterator() {
+ return new Iterator<DagTask>() {
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public DagTask next() {
+
+ DagTask dagTask = null;
+ while(true) {
+ DagActionStore.DagAction dagAction = take();
+ Properties jobProps = getJobProperties(dagAction);
+ try {
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis());
+ if(leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ dagTask = createDagTask(dagAction,
+ (MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus);
+ }
+ if (dagTask != null) {
+ break; // Exit the loop when dagTask is non-null
+ }
+ } catch (IOException e) {
+ //TODO: need to handle exceptions gracefully
+ throw new RuntimeException(e);
+ }
+ }
+ return dagTask;
+ }
+ };
+ }
+
+ private boolean add(DagActionStore.DagAction dagAction) {
+ return this.dagActionQueue.offer(dagAction);
+ }
+
+ private DagActionStore.DagAction take() {
+ return this.dagActionQueue.poll();
+ }
+
+ private DagTask createDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ DagActionStore.FlowActionType flowActionType =
dagAction.getFlowActionType();
+ switch (flowActionType) {
+ case KILL:
+ return new KillDagTask(dagAction, leaseObtainedStatus);
+ case RESUME:
+ case LAUNCH:
+ case ADVANCE:
+ default:
+ log.warn("It should not reach here. Yet to provide implementation.");
+ return null;
+ }
+ }
+
+ protected void complete(DagTask dagTask) throws IOException {
+ dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+ }
+
+ @Override
+ public void launchFlow(String flowGroup, String flowName, long
eventTimestamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently launch flow is not
supported.");
+ }
+
+ @Override
+ public void resumeFlow(DagActionStore.DagAction killAction, long
eventTimestamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently resume flow is not
supported.");
+ }
+
+ @Override
+ public void killFlow(DagActionStore.DagAction killAction, long
eventTimestamp) throws IOException {
+ if(!add(killAction)) {
+ throw new IOException("Could not add kill dag action: " + killAction +
" to the queue.");
+ }
+ }
+ /**
+ * Check if the SLA is configured for the flow this job belongs to.
+ * If it is, this method will enforce on cancelling the job when SLA is
reached.
+ * It will return true and cancellation would be initiated via {@link
org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc}
+ * by creating {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction} on the flowtype:
{@link org.apache.gobblin.runtime.api.DagActionStore.FlowActionType#CANCEL}
+ * The Flow SLA will be set when the {@link Dag} is launched either via
Scheduler or REST client
+ * @param node dag node of the job
+ * @return true if the job reached sla needs to be cancelled
+ * @throws ExecutionException exception
+ * @throws InterruptedException exception
+ */
+
+ @Override
+ public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan>
node) throws ExecutionException, InterruptedException {
+ //TODO: need to fetch timestamps from database when multi-active is enabled
+ long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+ long currentTime = System.currentTimeMillis();
+ String dagId = DagManagerUtils.generateDagId(node).toString();
+
+ long flowSla = this.dagManagementStateStore.getDagSLA(dagId);
+
+ if (currentTime > flowStartTime + flowSla) {
+ log.info("Flow {} exceeded the SLA of {} ms. Enforce cancellation of the
job {} ...",
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
flowSla,
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
Review Comment:
good descriptive message. Let's add one detail exceeded the _completion SLA_
as opposed to start
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+ @Getter
+ private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
+
+
+ @NotNull
+ @Override
+ public Iterator<DagTask> iterator() {
+ return new Iterator<DagTask>() {
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public DagTask next() {
+
+ DagTask dagTask = null;
+ while(true) {
+ DagActionStore.DagAction dagAction = take();
+ Properties jobProps = getJobProperties(dagAction);
+ try {
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis());
+ if(leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ dagTask = createDagTask(dagAction,
+ (MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus);
+ }
+ if (dagTask != null) {
+ break; // Exit the loop when dagTask is non-null
+ }
+ } catch (IOException e) {
+ //TODO: need to handle exceptions gracefully
+ throw new RuntimeException(e);
+ }
+ }
+ return dagTask;
+ }
+ };
+ }
+
+ private boolean add(DagActionStore.DagAction dagAction) {
+ return this.dagActionQueue.offer(dagAction);
+ }
+
+ private DagActionStore.DagAction take() {
+ return this.dagActionQueue.poll();
+ }
+
+ private DagTask createDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ DagActionStore.FlowActionType flowActionType =
dagAction.getFlowActionType();
+ switch (flowActionType) {
+ case KILL:
+ return new KillDagTask(dagAction, leaseObtainedStatus);
+ case RESUME:
+ case LAUNCH:
+ case ADVANCE:
+ default:
+ log.warn("It should not reach here. Yet to provide implementation.");
+ return null;
+ }
+ }
+
+ protected void complete(DagTask dagTask) throws IOException {
+ dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+ }
+
+ @Override
+ public void launchFlow(String flowGroup, String flowName, long
eventTimestamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently launch flow is not
supported.");
+ }
+
+ @Override
+ public void resumeFlow(DagActionStore.DagAction killAction, long
eventTimestamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently resume flow is not
supported.");
+ }
+
+ @Override
+ public void killFlow(DagActionStore.DagAction killAction, long
eventTimestamp) throws IOException {
+ if(!add(killAction)) {
+ throw new IOException("Could not add kill dag action: " + killAction +
" to the queue.");
+ }
+ }
+ /**
+ * Check if the SLA is configured for the flow this job belongs to.
+ * If it is, this method will enforce on cancelling the job when SLA is
reached.
+ * It will return true and cancellation would be initiated via {@link
org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc}
+ * by creating {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction} on the flowtype:
{@link org.apache.gobblin.runtime.api.DagActionStore.FlowActionType#CANCEL}
Review Comment:
can shorten this since we have the "@return true" below and just have `via
...` joining with sentence above
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc}
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link
DagProc#process(DagManagementStateStore, int, long)}
+ */
+
+@Alpha
+@Slf4j
+public class DagProcessingEngine {
+
+ public static final String NUM_THREADS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "numThreads";
+ public static final String JOB_STATUS_POLLING_INTERVAL_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "pollingInterval";
+ public static final String MAX_RETRY_ATTEMPTS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "maxRetryAttempts";
+ public static final String RETRY_DELAY_MS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_ENABLED_KEY + "retryDelayMillis";
Review Comment:
let's put this in the `ServiceConfigKeys` or `ConfigurationKeys`. Also agree
with kip's comment above that there should be a `dag_processing_prefix` used
before them all
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+ @Getter
+ private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
+
+
+ @NotNull
+ @Override
+ public Iterator<DagTask> iterator() {
+ return new Iterator<DagTask>() {
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public DagTask next() {
+
+ DagTask dagTask = null;
+ while(true) {
+ DagActionStore.DagAction dagAction = take();
+ Properties jobProps = getJobProperties(dagAction);
+ try {
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis());
+ if(leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ dagTask = createDagTask(dagAction,
+ (MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus);
+ }
+ if (dagTask != null) {
+ break; // Exit the loop when dagTask is non-null
+ }
+ } catch (IOException e) {
+ //TODO: need to handle exceptions gracefully
+ throw new RuntimeException(e);
+ }
+ }
+ return dagTask;
+ }
+ };
+ }
+
+ private boolean add(DagActionStore.DagAction dagAction) {
+ return this.dagActionQueue.offer(dagAction);
+ }
+
+ private DagActionStore.DagAction take() {
+ return this.dagActionQueue.poll();
+ }
+
+ private DagTask createDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ DagActionStore.FlowActionType flowActionType =
dagAction.getFlowActionType();
+ switch (flowActionType) {
+ case KILL:
+ return new KillDagTask(dagAction, leaseObtainedStatus);
+ case RESUME:
+ case LAUNCH:
+ case ADVANCE:
+ default:
+ log.warn("It should not reach here. Yet to provide implementation.");
+ return null;
+ }
+ }
+
+ protected void complete(DagTask dagTask) throws IOException {
+ dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+ }
+
+ @Override
+ public void launchFlow(String flowGroup, String flowName, long
eventTimestamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently launch flow is not
supported.");
+ }
+
+ @Override
+ public void resumeFlow(DagActionStore.DagAction killAction, long
eventTimestamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently resume flow is not
supported.");
+ }
+
+ @Override
+ public void killFlow(DagActionStore.DagAction killAction, long
eventTimestamp) throws IOException {
+ if(!add(killAction)) {
+ throw new IOException("Could not add kill dag action: " + killAction +
" to the queue.");
+ }
+ }
+ /**
+ * Check if the SLA is configured for the flow this job belongs to.
+ * If it is, this method will enforce on cancelling the job when SLA is
reached.
Review Comment:
remove `on`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jetbrains.annotations.NotNull;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagProcessingEngine} would
pull from to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} that defines the
rules for a flow and job.
+ * Implements {@link Iterable} to provide {@link DagTask}s as soon as it's
available to {@link DagProcessingEngine}
+ */
+
+@Alpha
+@Slf4j
+@AllArgsConstructor
+public class DagTaskStream implements Iterable<DagTask>, DagManagement {
+
+ @Getter
+ private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+ private FlowTriggerHandler flowTriggerHandler;
+ private DagManagementStateStore dagManagementStateStore;
+ private DagManagerMetrics dagManagerMetrics;
+
+
+ @NotNull
+ @Override
+ public Iterator<DagTask> iterator() {
+ return new Iterator<DagTask>() {
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public DagTask next() {
+
+ DagTask dagTask = null;
+ while(true) {
+ DagActionStore.DagAction dagAction = take();
+ Properties jobProps = getJobProperties(dagAction);
+ try {
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis());
+ if(leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ dagTask = createDagTask(dagAction,
+ (MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus);
+ }
+ if (dagTask != null) {
+ break; // Exit the loop when dagTask is non-null
+ }
+ } catch (IOException e) {
+ //TODO: need to handle exceptions gracefully
+ throw new RuntimeException(e);
+ }
+ }
+ return dagTask;
+ }
+ };
+ }
+
+ private boolean add(DagActionStore.DagAction dagAction) {
+ return this.dagActionQueue.offer(dagAction);
+ }
+
+ private DagActionStore.DagAction take() {
+ return this.dagActionQueue.poll();
+ }
+
+ private DagTask createDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ DagActionStore.FlowActionType flowActionType =
dagAction.getFlowActionType();
+ switch (flowActionType) {
+ case KILL:
+ return new KillDagTask(dagAction, leaseObtainedStatus);
+ case RESUME:
+ case LAUNCH:
+ case ADVANCE:
+ default:
+ log.warn("It should not reach here. Yet to provide implementation.");
+ return null;
+ }
+ }
+
+ protected void complete(DagTask dagTask) throws IOException {
+ dagTask.conclude(this.flowTriggerHandler.getMultiActiveLeaseArbiter());
+ }
+
+ @Override
+ public void launchFlow(String flowGroup, String flowName, long
eventTimestamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently launch flow is not
supported.");
+ }
+
+ @Override
+ public void resumeFlow(DagActionStore.DagAction killAction, long
eventTimestamp) {
+ //TODO: provide implementation after finalizing code flow
+ throw new UnsupportedOperationException("Currently resume flow is not
supported.");
+ }
+
+ @Override
+ public void killFlow(DagActionStore.DagAction killAction, long
eventTimestamp) throws IOException {
+ if(!add(killAction)) {
+ throw new IOException("Could not add kill dag action: " + killAction +
" to the queue.");
+ }
+ }
+ /**
+ * Check if the SLA is configured for the flow this job belongs to.
+ * If it is, this method will enforce on cancelling the job when SLA is
reached.
+ * It will return true and cancellation would be initiated via {@link
org.apache.gobblin.service.modules.orchestration.proc.AdvanceDagProc}
+ * by creating {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction} on the flowtype:
{@link org.apache.gobblin.runtime.api.DagActionStore.FlowActionType#CANCEL}
+ * The Flow SLA will be set when the {@link Dag} is launched either via
Scheduler or REST client
+ * @param node dag node of the job
+ * @return true if the job reached sla needs to be cancelled
+ * @throws ExecutionException exception
+ * @throws InterruptedException exception
+ */
+
+ @Override
+ public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan>
node) throws ExecutionException, InterruptedException {
+ //TODO: need to fetch timestamps from database when multi-active is enabled
+ long flowStartTime = DagManagerUtils.getFlowStartTime(node);
+ long currentTime = System.currentTimeMillis();
+ String dagId = DagManagerUtils.generateDagId(node).toString();
+
+ long flowSla = this.dagManagementStateStore.getDagSLA(dagId);
+
+ if (currentTime > flowStartTime + flowSla) {
+ log.info("Flow {} exceeded the SLA of {} ms. Enforce cancellation of the
job {} ...",
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY),
flowSla,
+
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+ dagManagerMetrics.incrementExecutorSlaExceeded(node);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Enforce cancel on the job if the job has been "orphaned". A job is
orphaned if has been in ORCHESTRATED
+ * {@link ExecutionStatus} for some specific amount of time.
Review Comment:
specific amount of time determined by config?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Synchronized;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate
independently.
+ */
+@Getter(onMethod_={@Synchronized})
+@Alpha
+public class InMemoryDagManagementStateStore implements
DagManagementStateStore {
+ private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>>
jobToDag = new HashMap<>();
+ private final Map<String, Dag<JobExecutionPlan>> dagIdToDags = new
HashMap<>();
+ private final Set<String> failedDagIds = new HashSet<>();
+ private final Map<String, Dag<JobExecutionPlan>> dagIdToResumingDags = new
HashMap<>();
+ // dagToJobs holds a map of dagId to running jobs of that dag
+ final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new
HashMap<>();
+ final Map<String, Long> dagToSLA = new HashMap<>();
+ private final Set<String> dagIdstoClean = new HashSet<>();
+ private Optional<DagActionStore> dagActionStore;
+
+ @Override
+ public synchronized void deleteJobState(String dagId,
Dag.DagNode<JobExecutionPlan> dagNode) {
Review Comment:
may throw exception if the `dagNode` does not exist
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/AdvanceDagProc.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+
+
+/**
+ * An implementation of {@link DagProc} dealing which advancing to the next
node in the {@link Dag}.
Review Comment:
`dealing WITH`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryDagManagementStateStore.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Synchronized;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagManagementStateStore} to provide information
about dags, dag nodes and their job states.
+ * This store maintains and utilizes in-memory references about dags and their
job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate
independently.
+ */
+@Getter(onMethod_={@Synchronized})
+@Alpha
+public class InMemoryDagManagementStateStore implements
DagManagementStateStore {
+ private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>>
jobToDag = new HashMap<>();
+ private final Map<String, Dag<JobExecutionPlan>> dagIdToDags = new
HashMap<>();
+ private final Set<String> failedDagIds = new HashSet<>();
+ private final Map<String, Dag<JobExecutionPlan>> dagIdToResumingDags = new
HashMap<>();
+ // dagToJobs holds a map of dagId to running jobs of that dag
+ final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new
HashMap<>();
+ final Map<String, Long> dagToSLA = new HashMap<>();
Review Comment:
`dagIdToSLA/Deadline`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/CleanUpDagProc.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+
+
+/**
+ * An implementation of {@link DagProc} that is responsible for cleaning up
{@link Dag} that has reached an end state
+ * likewise: FAILED, COMPLETE or CANCELED
+ *
+ */
+@Slf4j
+@Alpha
+public class CleanUpDagProc extends DagProc {
+
+ private CleanUpDagTask cleanUpDagTask;
+
+ public CleanUpDagProc(CleanUpDagTask cleanUpDagTask) {
+ this.cleanUpDagTask = cleanUpDagTask;
+ }
+
+ @Override
+ protected Object initialize(DagManagementStateStore dagManagementStateStore)
throws MaybeRetryableException, IOException {
+ throw new UnsupportedOperationException("Not supported");
+
Review Comment:
extra space
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.orchestration.processor.DagProc;
+import org.apache.gobblin.service.modules.orchestration.processor.KillDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.AdvanceDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.CleanUpDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.KillDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.orchestration.task.ResumeDagTask;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Factory for creating {@link DagProc} based on the visitor type for a given
{@link DagTask}.
+ */
+
+@Alpha
+@Slf4j
+public class DagProcFactory implements DagTaskVisitor<DagProc> {
+
+ private DagManagementStateStore dagManagementStateStore;
+ private JobStatusRetriever jobStatusRetriever;
+ private FlowStatusGenerator flowStatusGenerator;
+ private UserQuotaManager quotaManager;
+ private SpecCompiler specCompiler;
+ private FlowCatalog flowCatalog;
+ private FlowCompilationValidationHelper flowCompilationValidationHelper;
+ private Config config;
+ private Optional<EventSubmitter> eventSubmitter;
+ private boolean instrumentationEnabled;
+
Review Comment:
since we have attempted to initialize with Guice since previous revision
let's also add the `@Inject` annotation to the constructor
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -313,4 +317,42 @@ protected static long getUTCTimeFromDelayPeriod(long
delayPeriodMillis) {
Date date = Date.from(localDateTime.atZone(ZoneId.of("UTC")).toInstant());
return GobblinServiceJobScheduler.utcDateAsUTCEpochMillis(date);
}
+
+ /**
+ * Attempts to acquire lease for a given {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}
+ * through lease arbitration and if it fails, it will create and schedule a
reminder trigger to check back again.
Review Comment:
let's update this in the next iteration
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An interface to provide abstractions for managing {@link Dag} and {@link
org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states
Review Comment:
add more description like `for a Dag, allows one to update or extract its
job state or ...` giving some high level descriptions
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -94,6 +99,9 @@ public DagActionStoreChangeMonitor(String topic, Config
config, DagActionStore d
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+ // instantiating using default ctor; subsequent PR will handle
instantiating with multi-args ctor
+// this.dagTaskStream = new DagTaskStream();
Review Comment:
yea we should use guice to bring in it in, see the
`DagActionStoreChangeMonitorFactory`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -136,14 +144,22 @@ protected void processMessage(DecodeableKafkaRecord
message) {
// We only expect INSERT and DELETE operations done to this table. INSERTs
correspond to any type of
// {@link DagActionStore.FlowActionType} flow requests that have to be
processed. DELETEs require no action.
try {
+
if (operation.equals("INSERT")) {
if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
log.info("Received insert dag action and about to send resume flow
request");
dagManager.handleResumeFlowRequest(flowGroup,
flowName,Long.parseLong(flowExecutionId));
+ //TODO: add a flag for if condition only if multi-active is enabled
this.resumesInvoked.mark();
} else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) {
log.info("Received insert dag action and about to send kill flow
request");
dagManager.handleKillFlowRequest(flowGroup, flowName,
Long.parseLong(flowExecutionId));
Review Comment:
or are we trying a "dummy kill in the new refactor" if so let's add a
comment that the change below doesn't carry out the action. it's still in
testing.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios
for launch, resume, kill, job start
+ * and flow completion deadlines
+ *
+ */
+@Alpha
+public interface DagManagement {
+
+ /**
+ * Currently, it is handling just the launch of a {@link Dag} request via
REST client for adhoc flows.
+ * The eventTimestamp for adhoc flows will always be 0 (zero), while -1
would indicate failures.
+ * Essentially for a valid launch flow, the eventTimestamp needs to be >= 0.
+ * Future implementations will cover launch of flows through the scheduler
too!
Review Comment:
Agree with Kip's suggestion here to make it read like an interface
"Handles launching of ... via ..."
@param followed by requirements
@throws if input doesn't match
Issue Time Tracking
-------------------
Worklog Id: (was: 883014)
Time Spent: 13.5h (was: 13h 20m)
> Refactor code to move current in-memory references to new design for REST
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1910
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Meeth Gala
> Priority: Major
> Time Spent: 13.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)