[
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=908455&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-908455
]
ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 06/Mar/24 02:30
Start Date: 06/Mar/24 02:30
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3858:
URL: https://github.com/apache/gobblin/pull/3858#discussion_r1513704254
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task on a Dag.
+ * Upon successful completion of the corresponding {@link
DagProc#process(DagManagementStateStore)},
+ * {@link DagTask#conclude()} must be called.
+ */
+
+@Alpha
+public abstract class DagTask<T> {
+ @Getter public DagActionStore.DagAction dagAction;
+ private MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus;
+ @Getter DagManager.DagId dagId;
Review Comment:
`protected`? `final`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java:
##########
@@ -54,21 +59,50 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
// dagToJobs holds a map of dagId to running jobs of that dag
private final Map<DagManager.DagId,
LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new
ConcurrentHashMap<>();
private final Map<DagManager.DagId, Long> dagToDeadline = new
ConcurrentHashMap<>();
- private final DagStateStore dagStateStore;
- private final DagStateStore failedDagStateStore;
+ private DagStateStore dagStateStore;
+ private DagStateStore failedDagStateStore;
+ private boolean dagStoresInitialized = false;
private final UserQuotaManager quotaManager;
+ Map<URI, TopologySpec> topologySpecMap;
+ private final Config config;
private static final String FAILED_DAG_STATESTORE_PREFIX =
"failedDagStateStore";
public static final String DAG_STATESTORE_CLASS_KEY =
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+ FlowCatalog flowCatalog;
- public MostlyMySqlDagManagementStateStore(Config config, Map<URI,
TopologySpec> topologySpecMap) throws IOException {
- this.dagStateStore = createDagStateStore(config, topologySpecMap);
- this.failedDagStateStore = createDagStateStore(
- ConfigUtils.getConfigOrEmpty(config,
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+ @Inject
+ public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog
flowCatalog) throws IOException {
this.quotaManager = new MysqlUserQuotaManager(config);
- this.quotaManager.init(getDags());
+ this.config = config;
+ this.flowCatalog = flowCatalog;
+ }
+
+ @Override
+ // It should be called after topology spec map is set
+ public synchronized void start() throws IOException {
Review Comment:
if the reason to have a separate `start()` is that the topologySpecMap must
be set, why not combine this method w/ `setTopologySpecMap` so it takes a TSM
param?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Iterator;
+
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+public interface DagTaskStream extends Iterator<DagTask> {
+ boolean hasNext();
+ DagTask next();
+}
Review Comment:
let's add javadoc
also the body could be empty, as these methods already come in from
`Iterator<DagTask>`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :
Review Comment:
rename
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible for performing the actual work for a given {@link DagTask} by
first initializing its state, performing
+ * actions based on the type of {@link DagTask} and finally submitting an
event to the executor.
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, T> {
+ protected static final MetricContext metricContext =
Instrumented.getMetricContext(new State(), DagProc.class);
+ protected static final EventSubmitter eventSubmitter = new
EventSubmitter.Builder(
+ metricContext, "org.apache.gobblin.service").build();
+
+ public final void process(DagManagementStateStore dagManagementStateStore)
throws IOException {
+ S state = initialize(dagManagementStateStore); // todo - retry
+ T result = act(dagManagementStateStore, state); // todo - retry
+ commit(dagManagementStateStore, result); // todo - retry
+ }
+
+ protected abstract S initialize(DagManagementStateStore
dagManagementStateStore) throws IOException;
+
+ protected abstract T act(DagManagementStateStore dagManagementStateStore, S
state) throws IOException;
+
+ // todo - commit the modified dags to the persistent store, maybe not
required for InMem dagManagementStateStore
+ protected abstract void commit(DagManagementStateStore
dagManagementStateStore, T result);
Review Comment:
I'm not sure where the `commit` came in. I'd recommend that whatever we're
doing against the state store in `act` be committed within. [I
recall](https://github.com/apache/gobblin/pull/3776/files#diff-f77e59443bfc1a4d7d559486eb0ad62222bce017c0d7b547940dad417907606dR45)
the original third `DagProc` method to be `sendNotification` (which produces
the GTE for the `KafkaJobStatusMonitor`, where needed).
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -36,6 +40,26 @@
*/
@Alpha
public interface DagManagementStateStore {
+
+ /**
+ * Does any initial setup work. It should usually be called after the
initialization.
+ */
+ default void start() throws IOException {
+ initQuota(getDags());
+ }
+
+ /**
+ * Returns a {@link FlowSpec} for the given URI.
+ * @throws SpecNotFoundException if the spec is not found
+ */
+ FlowSpec getSpecs(URI uri) throws SpecNotFoundException;
Review Comment:
why name this `getSpecs` rather than `getSpec`? truly, most precise would
be `getFlowSpec`.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) interact with {@link DagManagementStateStore} to update/retrieve dags,
checkpoint
+ * b) add {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} to
the {@link DagTaskStream}
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement,
DagTaskStream {
+ private final Config config;
+ @Getter private final EventSubmitter eventSubmitter;
+ @Getter private static final DagManagerMetrics dagManagerMetrics = new
DagManagerMetrics();
+ private volatile boolean isActive = false;
+
+ @Inject(optional=true)
+ protected Optional<DagActionStore> dagActionStore;
+ @Inject
+ @Getter DagManagementStateStore dagManagementStateStore;
+ private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+ private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+
+ @Inject
+ public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore>
dagActionStore, DagManagementStateStore dagManagementStateStore) {
+ this.config = config;
+ this.dagActionStore = dagActionStore;
+ this.dagManagementStateStore = dagManagementStateStore;
+ MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
+ this.eventSubmitter = new EventSubmitter.Builder(metricContext,
"org.apache.gobblin.service").build();
+ }
+
+ public void setActive(boolean active) {
+ if (this.isActive == active) {
+ log.info("DagManagementTaskStreamImpl already {}, skipping further
actions.", (!active) ? "inactive" : "active");
+ }
+ this.isActive = active;
+ try {
+ if (this.isActive) {
+ log.info("Activating DagManagementTaskStreamImpl.");
+ //Initializing state store for persisting Dags.
+ this.dagManagementStateStore.start();
+ dagManagerMetrics.activate();
+ } else { //Mark the DagManager inactive.
+ log.info("Inactivating the DagManagementTaskStreamImpl. Shutting down
all DagManager threads");
+ dagManagerMetrics.cleanup();
+ }
+ } catch (IOException e) {
+ log.error("Exception encountered when activating the
DagManagementTaskStreamImpl", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public synchronized void addDagAction(DagActionStore.DagAction dagAction)
throws IOException {
+ // TODO: Used to track missing dag issue, remove later as needed
+ log.info("Add dagAction{}", dagAction);
+ if (!isActive) {
+ log.warn("Skipping add dagAction because this instance of
DagManagementTaskStreamImpl is not active for dag: {}",
+ dagAction);
+ return;
+ }
+
+ DagManager.DagId dagId =
DagManagerUtils.generateDagId(dagAction.getFlowGroup(),
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+ String dagIdString = dagId.toString();
+ if (this.dagManagementStateStore.containsDag(dagId)) {
+ log.warn("Already tracking a dag with dagId {}, skipping.", dagIdString);
+ return;
+ }
+
+ // After persisting the dag, its status will be tracked by active
dagManagers so the action should be deleted
+ // to avoid duplicate executions upon leadership change
+ if (this.dagActionStore.isPresent()) {
+ this.dagActionStore.get().deleteDagAction(dagAction);
+ }
+
+ if (!this.dagActionQueue.offer(dagAction)) {
+ throw new RuntimeException("Could not add dag action " + dagAction + "
to the queue");
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !this.dagActionQueue.isEmpty();
+ }
+
+ @Override
+ public DagTask<DagProc> next() {
+ try {
+ DagActionStore.DagAction dagAction = this.dagActionQueue.take();
//`take` blocks till element is not available
+ // todo reconsider the use of MultiActiveLeaseArbiter
+ //MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = new
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction);
+ // todo - uncomment after flow trigger handler provides such an api
+ //Properties jobProps = getJobProperties(dagAction);
+ //flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis());
+ //if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ // can it return null? is this iterator allowed to return null?
+ return createDagTask(dagAction, new
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction,
System.currentTimeMillis()));
+ //}
+ } catch (Throwable t) {
Review Comment:
catching `Throwable` is pretty extreme, given it would swallow OOM and other
low-level `Error`s. what specifically are the kinds of exceptions you're on
the lookout for?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) interact with {@link DagManagementStateStore} to update/retrieve dags,
checkpoint
+ * b) add {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} to
the {@link DagTaskStream}
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement,
DagTaskStream {
+ private final Config config;
+ @Getter private final EventSubmitter eventSubmitter;
+ @Getter private static final DagManagerMetrics dagManagerMetrics = new
DagManagerMetrics();
+ private volatile boolean isActive = false;
+
+ @Inject(optional=true)
+ protected Optional<DagActionStore> dagActionStore;
+ @Inject
+ @Getter DagManagementStateStore dagManagementStateStore;
+ private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+ private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+
+ @Inject
+ public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore>
dagActionStore, DagManagementStateStore dagManagementStateStore) {
+ this.config = config;
+ this.dagActionStore = dagActionStore;
+ this.dagManagementStateStore = dagManagementStateStore;
+ MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
+ this.eventSubmitter = new EventSubmitter.Builder(metricContext,
"org.apache.gobblin.service").build();
+ }
+
+ public void setActive(boolean active) {
+ if (this.isActive == active) {
+ log.info("DagManagementTaskStreamImpl already {}, skipping further
actions.", (!active) ? "inactive" : "active");
+ }
+ this.isActive = active;
+ try {
+ if (this.isActive) {
+ log.info("Activating DagManagementTaskStreamImpl.");
+ //Initializing state store for persisting Dags.
+ this.dagManagementStateStore.start();
+ dagManagerMetrics.activate();
+ } else { //Mark the DagManager inactive.
+ log.info("Inactivating the DagManagementTaskStreamImpl. Shutting down
all DagManager threads");
+ dagManagerMetrics.cleanup();
+ }
+ } catch (IOException e) {
+ log.error("Exception encountered when activating the
DagManagementTaskStreamImpl", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public synchronized void addDagAction(DagActionStore.DagAction dagAction)
throws IOException {
+ // TODO: Used to track missing dag issue, remove later as needed
+ log.info("Add dagAction{}", dagAction);
+ if (!isActive) {
+ log.warn("Skipping add dagAction because this instance of
DagManagementTaskStreamImpl is not active for dag: {}",
+ dagAction);
+ return;
+ }
+
+ DagManager.DagId dagId =
DagManagerUtils.generateDagId(dagAction.getFlowGroup(),
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+ String dagIdString = dagId.toString();
+ if (this.dagManagementStateStore.containsDag(dagId)) {
+ log.warn("Already tracking a dag with dagId {}, skipping.", dagIdString);
+ return;
+ }
+
+ // After persisting the dag, its status will be tracked by active
dagManagers so the action should be deleted
+ // to avoid duplicate executions upon leadership change
+ if (this.dagActionStore.isPresent()) {
+ this.dagActionStore.get().deleteDagAction(dagAction);
+ }
Review Comment:
seems naively that if multiple active participants that they'd all try to
delete the same dag action from the shared store.
or are you relying on some trick, such as that `dagActionStore.isPresent()`
is only true for active participants
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and
processing the
+ * {@link org.apache.gobblin.service.modules.flowgraph.Dag} based on the type
of {@link DagTask}.
+ * Each {@link DagTask} returned from the {@link DagTaskStream} comes with a
time-limited lease conferring the exclusive
+ * right to perform the work of the task.
+ * The {@link DagProcFactory} transforms each {@link DagTask} into a specific,
concrete {@link DagProc}, which
+ * encapsulates all processing inside {@link
DagProc#process(DagManagementStateStore)}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcessingEngine {
+
+ @Getter private final DagTaskStream dagTaskStream;
+ @Getter DagManagementStateStore dagManagementStateStore;
+
+ @Inject
+ public DagProcessingEngine(Config config, DagTaskStream dagTaskStream,
DagProcFactory dagProcFactory,
+ DagManagementStateStore dagManagementStateStore) {
+ Integer numThreads = ConfigUtils.getInt
+ (config, ServiceConfigKeys.NUM_DAG_PROC_THREADS_KEY,
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS);
+ ScheduledExecutorService scheduledExecutorPool =
+ Executors.newScheduledThreadPool(numThreads, new
NamedThreadFactory("DagProcessingEngineThread"));
+ this.dagTaskStream = dagTaskStream;
+ this.dagManagementStateStore = dagManagementStateStore;
+
+ for (int i=0; i < numThreads; i++) {
+ DagProcEngineThread dagProcEngineThread = new
DagProcEngineThread(dagTaskStream, dagProcFactory, dagManagementStateStore);
+ scheduledExecutorPool.submit(dagProcEngineThread);
+ }
+ }
+
+ @AllArgsConstructor
+ @VisibleForTesting
+ static class DagProcEngineThread implements Runnable {
+ private DagTaskStream dagTaskStream;
+ private DagProcFactory dagProcFactory;
+ private DagManagementStateStore dagManagementStateStore;
+
+ @Override
+ public void run() {
+ while (true) {
+ DagTask<DagProc> dagTask = dagTaskStream.next(); // blocking call
+ if (dagTask == null) {
+ log.warn("Received a null dag task, ignoring.");
+ continue;
+ }
+ DagProc dagProc = dagTask.host(dagProcFactory);
+ try {
+ // todo - add retries
+ dagProc.process(dagManagementStateStore);
+ dagTask.conclude();
+ } catch (Throwable t) {
+ log.error("DagProcEngineThread encountered error ", t);
Review Comment:
let's include an identifier in the log message, such as `dagTask.getDagId()`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task on a Dag.
+ * Upon successful completion of the corresponding {@link
DagProc#process(DagManagementStateStore)},
+ * {@link DagTask#conclude()} must be called.
+ */
+
+@Alpha
+public abstract class DagTask<T> {
+ @Getter public DagActionStore.DagAction dagAction;
+ private MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus;
+ @Getter DagManager.DagId dagId;
+
+ public DagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ this.dagAction = dagAction;
+ this.leaseObtainedStatus = leaseObtainedStatus;
+ this.dagId = DagManagerUtils.generateDagId(dagAction.getFlowGroup(),
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+ }
+
+ public abstract T host(DagTaskVisitor<T> visitor);
+
+ /**
+ * Any cleanup work, e.g. releasing lease if it was acquired earlier, may be
done in this method.
+ * Returns true if concluding dag task finished successfully otherwise false.
+ */
+ // todo call it from the right place
+ public abstract boolean conclude() throws IOException;
Review Comment:
if you already have `leaseObtainedStatus` on hand, why can't you implement
this here? AFAICT, it could even be `final` here.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+
+
+/**
+ * A {@link DagTask} responsible to handle launch tasks.
+ */
+
+@Slf4j
Review Comment:
I don't see you logging, so suggest not to add annotation
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) interact with {@link DagManagementStateStore} to update/retrieve dags,
checkpoint
+ * b) add {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} to
the {@link DagTaskStream}
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement,
DagTaskStream {
+ private final Config config;
+ @Getter private final EventSubmitter eventSubmitter;
+ @Getter private static final DagManagerMetrics dagManagerMetrics = new
DagManagerMetrics();
+ private volatile boolean isActive = false;
+
+ @Inject(optional=true)
+ protected Optional<DagActionStore> dagActionStore;
+ @Inject
+ @Getter DagManagementStateStore dagManagementStateStore;
+ private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+ private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+
+ @Inject
+ public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore>
dagActionStore, DagManagementStateStore dagManagementStateStore) {
+ this.config = config;
+ this.dagActionStore = dagActionStore;
+ this.dagManagementStateStore = dagManagementStateStore;
+ MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
+ this.eventSubmitter = new EventSubmitter.Builder(metricContext,
"org.apache.gobblin.service").build();
+ }
+
+ public void setActive(boolean active) {
+ if (this.isActive == active) {
+ log.info("DagManagementTaskStreamImpl already {}, skipping further
actions.", (!active) ? "inactive" : "active");
+ }
+ this.isActive = active;
+ try {
+ if (this.isActive) {
+ log.info("Activating DagManagementTaskStreamImpl.");
+ //Initializing state store for persisting Dags.
+ this.dagManagementStateStore.start();
+ dagManagerMetrics.activate();
+ } else { //Mark the DagManager inactive.
+ log.info("Inactivating the DagManagementTaskStreamImpl. Shutting down
all DagManager threads");
+ dagManagerMetrics.cleanup();
+ }
+ } catch (IOException e) {
+ log.error("Exception encountered when activating the
DagManagementTaskStreamImpl", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public synchronized void addDagAction(DagActionStore.DagAction dagAction)
throws IOException {
+ // TODO: Used to track missing dag issue, remove later as needed
+ log.info("Add dagAction{}", dagAction);
+ if (!isActive) {
+ log.warn("Skipping add dagAction because this instance of
DagManagementTaskStreamImpl is not active for dag: {}",
+ dagAction);
+ return;
+ }
+
+ DagManager.DagId dagId =
DagManagerUtils.generateDagId(dagAction.getFlowGroup(),
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+ String dagIdString = dagId.toString();
+ if (this.dagManagementStateStore.containsDag(dagId)) {
+ log.warn("Already tracking a dag with dagId {}, skipping.", dagIdString);
+ return;
+ }
+
+ // After persisting the dag, its status will be tracked by active
dagManagers so the action should be deleted
+ // to avoid duplicate executions upon leadership change
+ if (this.dagActionStore.isPresent()) {
+ this.dagActionStore.get().deleteDagAction(dagAction);
+ }
+
+ if (!this.dagActionQueue.offer(dagAction)) {
+ throw new RuntimeException("Could not add dag action " + dagAction + "
to the queue");
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !this.dagActionQueue.isEmpty();
Review Comment:
this should be an infinite stream, hence always `true` (even if it would
require blocking since `dagActionQueue` doesn't yet have any elements).
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * NewDagManager has these functionalities :
+ * a) interact with {@link DagManagementStateStore} to update/retrieve dags,
checkpoint
+ * b) add {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} to
the {@link DagTaskStream}
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement,
DagTaskStream {
+ private final Config config;
+ @Getter private final EventSubmitter eventSubmitter;
+ @Getter private static final DagManagerMetrics dagManagerMetrics = new
DagManagerMetrics();
+ private volatile boolean isActive = false;
+
+ @Inject(optional=true)
+ protected Optional<DagActionStore> dagActionStore;
+ @Inject
+ @Getter DagManagementStateStore dagManagementStateStore;
+ private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+ private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+
+ @Inject
+ public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore>
dagActionStore, DagManagementStateStore dagManagementStateStore) {
+ this.config = config;
+ this.dagActionStore = dagActionStore;
+ this.dagManagementStateStore = dagManagementStateStore;
+ MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
+ this.eventSubmitter = new EventSubmitter.Builder(metricContext,
"org.apache.gobblin.service").build();
+ }
+
+ public void setActive(boolean active) {
+ if (this.isActive == active) {
+ log.info("DagManagementTaskStreamImpl already {}, skipping further
actions.", (!active) ? "inactive" : "active");
+ }
+ this.isActive = active;
+ try {
+ if (this.isActive) {
+ log.info("Activating DagManagementTaskStreamImpl.");
+ //Initializing state store for persisting Dags.
+ this.dagManagementStateStore.start();
+ dagManagerMetrics.activate();
+ } else { //Mark the DagManager inactive.
+ log.info("Inactivating the DagManagementTaskStreamImpl. Shutting down
all DagManager threads");
+ dagManagerMetrics.cleanup();
+ }
+ } catch (IOException e) {
+ log.error("Exception encountered when activating the
DagManagementTaskStreamImpl", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public synchronized void addDagAction(DagActionStore.DagAction dagAction)
throws IOException {
+ // TODO: Used to track missing dag issue, remove later as needed
+ log.info("Add dagAction{}", dagAction);
+ if (!isActive) {
+ log.warn("Skipping add dagAction because this instance of
DagManagementTaskStreamImpl is not active for dag: {}",
+ dagAction);
+ return;
+ }
+
+ DagManager.DagId dagId =
DagManagerUtils.generateDagId(dagAction.getFlowGroup(),
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+ String dagIdString = dagId.toString();
+ if (this.dagManagementStateStore.containsDag(dagId)) {
+ log.warn("Already tracking a dag with dagId {}, skipping.", dagIdString);
+ return;
Review Comment:
I'm unclear on this check--what are we guarding against?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+
+
+/**
+ * A {@link DagTask} responsible to handle launch tasks.
+ */
+
+@Slf4j
+public class LaunchDagTask extends DagTask<LaunchDagProc> {
+ public LaunchDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ super(dagAction, leaseObtainedStatus);
+ }
+
+
+ @Override
+ public LaunchDagProc host(DagTaskVisitor<LaunchDagProc> visitor) {
+ return visitor.meet(this);
+ }
+
+ @Override
+ public boolean conclude() {
+ // todo - release lease
+ return true;
+ }
Review Comment:
as suggested, let's implement in base class. or do you anticipate that
some/one of the derived classes would need a cutom impl all its own?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.linkedin.r2.util.NamedThreadFactory;
+import com.typesafe.config.Config;
+
+import javax.naming.OperationNotSupportedException;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and
processing the {@link org.apache.gobblin.service.modules.flowgraph.Dag}
+ * based on the type of {@link DagTask} which is determined by the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * Each {@link DagTask} acquires a lease for the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}.
+ * The {@link DagProcFactory} then provides the appropriate {@link DagProc}
associated with the {@link DagTask}.
+ * The actual work or processing is done by the {@link
DagProc#process(DagManagementStateStore)}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcessingEngine {
+ public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX =
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
+ public static final String DAG_PROCESSING_ENGINE_ENABLED =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
+ public static final String NUM_THREADS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+ private static final Integer DEFAULT_NUM_THREADS = 3;
+
+ @Getter private final DagTaskStream dagTaskStream;
+ @Getter DagManagementStateStore dagManagementStateStore;
+
+ @Inject
+ public DagProcessingEngine(Config config, DagTaskStream dagTaskStream,
DagProcFactory dagProcFactory,
+ DagManagementStateStore dagManagementStateStore,
Optional<MultiActiveLeaseArbiter> multiActiveLeaseArbiter) {
+ Integer numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY,
DEFAULT_NUM_THREADS);
+ ScheduledExecutorService scheduledExecutorPool =
+ Executors.newScheduledThreadPool(numThreads, new
NamedThreadFactory("DagProcessingEngineThread"));
+ this.dagTaskStream = dagTaskStream;
+ this.dagManagementStateStore = dagManagementStateStore;
+
+ for (int i=0; i < numThreads; i++) {
+ DagProcEngineThread dagProcEngineThread = new
DagProcEngineThread(dagTaskStream, dagProcFactory, dagManagementStateStore);
+ scheduledExecutorPool.submit(dagProcEngineThread);
+ }
+ }
+
+ public void addDagNodeToRetry(Dag.DagNode<JobExecutionPlan> dagNode) throws
OperationNotSupportedException {
+ // todo - how to add dag action for a dag node? should we create a dag
node action? right now dag action is synonymous to flow action
+ // this.dagTaskStream.addDagTask(new RetryDagTask(dagNode));
+ throw new OperationNotSupportedException();
+ }
+
+ @AllArgsConstructor
+ private static class DagProcEngineThread implements Runnable {
+
+ private DagTaskStream dagTaskStream;
+ private DagProcFactory dagProcFactory;
+ private DagManagementStateStore dagManagementStateStore;
+
+ @Override
+ public void run() {
+ while (true) {
+ DagTask dagTask = dagTaskStream.next(); // blocking call
+ if (dagTask == null) {
+ continue;
+ }
+ DagProc dagProc = dagTask.host(dagProcFactory);
+ try {
+ // todo - add retries
+ dagProc.process(dagManagementStateStore);
+ } catch (Throwable t) {
+ log.error("DagProcEngineThread encountered error ", t);
+ }
+ // todo mark lease success and releases it
+ //dagTaskStream.complete(dagTask);
+ }
Review Comment:
performance-wise I'm not too concerned, as the reminder queue increase would
only be by the value of `ServiceConfigKeys.NUM_DAG_PROC_THREADS_KEY`.
the most important consideration is to confirm how the participant doing the
processing is certain not to "leak" tasks upon processing failure.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+public interface DagTaskVisitor<T> {
+ T meet(LaunchDagTask launchDagTask);
+}
Review Comment:
please still add javadoc!
Issue Time Tracking
-------------------
Worklog Id: (was: 908455)
Time Spent: 27h 20m (was: 27h 10m)
> Refactor code to move current in-memory references to new design for REST
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1910
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Meeth Gala
> Priority: Major
> Time Spent: 27h 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)