This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 02eca4f8d [GOBBLIN-1910] Pr dag refactor stage 2 (#3858)
02eca4f8d is described below
commit 02eca4f8de577930b4ba88f138b3631102308524
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Sat Mar 9 08:45:45 2024 -0800
[GOBBLIN-1910] Pr dag refactor stage 2 (#3858)
* add new components to take responsibilities away from DagManager
* rebase the latest commit
* address review comments
* add unit test
* add flow catalog to DagManagementStateStore
---------
Co-authored-by: Meeth Gala <[email protected]>
---
.../apache/gobblin/service/ServiceConfigKeys.java | 5 +
.../apache/gobblin/metrics/ServiceMetricNames.java | 2 +
.../gobblin/runtime/metrics/RuntimeMetrics.java | 1 +
.../modules/orchestration/DagManagement.java | 33 ++++
.../orchestration/DagManagementStateStore.java | 18 ++-
.../orchestration/DagManagementTaskStreamImpl.java | 115 +++++++++++++
.../modules/orchestration/DagManagerMetrics.java | 3 +
.../modules/orchestration/DagManagerUtils.java | 2 +-
.../modules/orchestration/DagProcFactory.java | 44 +++++
.../modules/orchestration/DagProcessingEngine.java | 106 ++++++++++++
.../modules/orchestration/DagTaskStream.java | 30 ++++
.../modules/orchestration/DagTaskVisitor.java | 25 +++
.../MostlyMySqlDagManagementStateStore.java | 50 +++++-
.../modules/orchestration/proc/DagProc.java | 61 +++++++
.../modules/orchestration/proc/LaunchDagProc.java | 75 +++++++++
.../modules/orchestration/task/DagTask.java | 60 +++++++
.../modules/orchestration/task/LaunchDagTask.java | 43 +++++
.../monitoring/DagActionStoreChangeMonitor.java | 25 +--
.../DagManagementDagActionStoreChangeMonitor.java | 89 ++++++++++
...eEnabledDagActionStoreChangeMonitorFactory.java | 85 ++++++++++
.../DagManagementTaskStreamImplTest.java | 91 +++++++++++
.../orchestration/DagProcessingEngineTest.java | 180 +++++++++++++++++++++
.../MostlyMySqlDagManagementStateStoreTest.java | 9 +-
23 files changed, 1127 insertions(+), 25 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 2cfb6a017..87bda430a 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -199,4 +199,9 @@ public class ServiceConfigKeys {
public static final int DEFAULT_MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT= 20;
public static final String ISSUE_REPO_CLASS = GOBBLIN_SERVICE_PREFIX +
"issueRepo.class";
+
+ public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX =
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
+ public static final String DAG_PROCESSING_ENGINE_ENABLED =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
+ public static final String NUM_DAG_PROC_THREADS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
+ public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
index 962713b1c..41e242b43 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
@@ -77,4 +77,6 @@ public class ServiceMetricNames {
public static final String DAG_COUNT_MYSQL_DAG_STATE_COUNT =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "MysqlDagStateStore" + ".totalDagCount";
public static final String DAG_COUNT_FS_DAG_STATE_COUNT =
GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "FsDagStateStore" + ".totalDagCount";
+
+ public static final String DAG_PROCESSING_EXCEPTION_METER =
"DagProcessingException";
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index 0d6dacf9a..11215a7f1 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -59,6 +59,7 @@ public class RuntimeMetrics {
public static final String
GOBBLIN_DAG_ACTION_STORE_SUCCESSFUL_LAUNCH_SUBMISSIONS_ON_STARTUP =
DAG_ACTION_STORE_MONITOR_PREFIX + "successfulLaunchSubmissionsOnStartup";
public static final String
GOBBLIN_DAG_ACTION_STORE_FAILED_LAUNCH_SUBMISSIONS_ON_STARTUP =
DAG_ACTION_STORE_MONITOR_PREFIX + "failedLaunchSubmissionsOnStartup";
public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS =
DAG_ACTION_STORE_MONITOR_PREFIX + "unexpected.errors";
+ public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_LAUNCH_EVENT_ERRORS =
DAG_ACTION_STORE_MONITOR_PREFIX + "unexpected.launch.event.errors";
public static final String
GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
DAG_ACTION_STORE_MONITOR_PREFIX + "produce.to.consume.delay";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER +
"gobblin.mysql.quota.manager.unexpected.errors";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
new file mode 100644
index 000000000..1e4b17c90
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+
+
+/**
+ * An interface to provide abstractions for managing operations on Dag.
+ * It accepts a {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction} which can be processed
later.
+ * Consumption of the Dags happen through {@link DagTaskStream}.
+ */
+public interface DagManagement {
+
+ void addDagAction(DagActionStore.DagAction dagAction) throws IOException;
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index 4903247d2..c6e45a46d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -18,13 +18,17 @@
package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
+import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
+import java.util.Properties;
import java.util.Set;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.exception.QuotaExceededException;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -36,6 +40,18 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
*/
@Alpha
public interface DagManagementStateStore {
+ /**
+ * Returns a {@link FlowSpec} for the given URI.
+ * @throws SpecNotFoundException if the spec is not found
+ */
+ FlowSpec getFlowSpec(URI uri) throws SpecNotFoundException;
+
+ /**
+ * Removes a {@link FlowSpec} with the given URI and pass the deletion to
listeners if `triggerListener` is true
+ * No-op if the flow spec was not present in the store.
+ */
+ void removeFlowSpec(URI uri, Properties headers, boolean triggerListener);
+
/**
* Checkpoints any changes in {@link Dag} or in its {@link Dag.DagNode}s.
* e.g. on adding a failed dag in store to retry later, on submitting a dag
node to spec producer because that changes
@@ -165,4 +181,4 @@ public interface DagManagementStateStore {
* Returns true if successfully reduces the quota usage
*/
boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws
IOException;
-}
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
new file mode 100644
index 000000000..a83d1103e
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * DagManagementTaskStreamImpl implements {@link DagManagement} and {@link
DagTaskStream}. It accepts
+ * {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction}s and
iteratively provides {@link DagTask}.
+ */
+@Slf4j
+@Singleton
+@Data
+public class DagManagementTaskStreamImpl implements DagManagement,
DagTaskStream {
+ private final Config config;
+ @Getter private final EventSubmitter eventSubmitter;
+ @Getter private static final DagManagerMetrics dagManagerMetrics = new
DagManagerMetrics();
+
+ @Inject(optional=true)
+ protected Optional<DagActionStore> dagActionStore;
+ @Inject
+ private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
+ private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
+
+ @Inject
+ public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore>
dagActionStore) {
+ this.config = config;
+ this.dagActionStore = dagActionStore;
+ MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
+ this.eventSubmitter = new EventSubmitter.Builder(metricContext,
"org.apache.gobblin.service").build();
+ dagManagerMetrics.activate();
+ }
+
+ @Override
+ public synchronized void addDagAction(DagActionStore.DagAction dagAction) {
+ // TODO: Used to track missing dag issue, remove later as needed
+ log.info("Add dagAction{}", dagAction);
+
+ if (!this.dagActionQueue.offer(dagAction)) {
+ throw new RuntimeException("Could not add dag action " + dagAction + "
to the queue");
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public DagTask next() {
+ try {
+ DagActionStore.DagAction dagAction = this.dagActionQueue.take();
//`take` blocks till element is not available
+ // todo reconsider the use of MultiActiveLeaseArbiter
+ //MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = new
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction);
+ // todo - uncomment after flow trigger handler provides such an api
+ //Properties jobProps = getJobProperties(dagAction);
+ //flowTriggerHandler.getLeaseOnDagAction(jobProps, dagAction,
System.currentTimeMillis());
+ //if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ // can it return null? is this iterator allowed to return null?
+ return createDagTask(dagAction, new
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction,
System.currentTimeMillis()));
+ //}
+ } catch (Throwable t) {
+ //TODO: need to handle exceptions gracefully
+ log.error("Error getting DagAction from the queue / creating DagTask",
t);
+ }
+ return null;
+ }
+
+ private DagTask createDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ DagActionStore.FlowActionType flowActionType =
dagAction.getFlowActionType();
+
+ switch (flowActionType) {
+ case LAUNCH:
+ return new LaunchDagTask(dagAction, leaseObtainedStatus);
+ default:
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
index a5f34cff7..24e25e044 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
@@ -62,6 +62,7 @@ public class DagManagerMetrics {
private ContextAwareMeter allRunningMeter;
private ContextAwareMeter allSlaExceededMeter;
private ContextAwareMeter allStartSlaExceededMeter;
+ public ContextAwareMeter dagProcessingExceptionMeter;
// Meters representing the flows in a given state per flowgroup
private final Map<String, ContextAwareMeter> groupSuccessfulMeters =
Maps.newConcurrentMap();
private final Map<String, ContextAwareMeter> groupFailureMeters =
Maps.newConcurrentMap();
@@ -100,6 +101,8 @@ public class DagManagerMetrics {
ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER));
allRunningMeter =
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR));
+ dagProcessingExceptionMeter =
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.DAG_PROCESSING_EXCEPTION_METER));
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index e894a0b16..d9d928e19 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -353,7 +353,7 @@ public class DagManagerUtils {
return dagNode.getValue().getSpecExecutor().getUri().toString();
}
- static void emitFlowEvent(EventSubmitter eventSubmitter,
Dag<JobExecutionPlan> dag, String flowEvent) {
+ public static void emitFlowEvent(EventSubmitter eventSubmitter,
Dag<JobExecutionPlan> dag, String flowEvent) {
if (!dag.isEmpty()) {
// Every dag node will contain the same flow metadata
Config config = getDagJobConfig(dag);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
new file mode 100644
index 000000000..3729c19d5
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.inject.Singleton;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+/**
+ * {@link DagTaskVisitor} for transforming a specific {@link DagTask} derived
class to its companion {@link DagProc} derived class.
+ * Each {@link DagTask} needs it own {@link DagProcFactory#meet} method
overload to create {@link DagProc} that is
+ * supposed to process that {@link DagTask}.
+ */
+
+@Alpha
+@Singleton
+public class DagProcFactory implements DagTaskVisitor<DagProc> {
+ @Override
+ public LaunchDagProc meet(LaunchDagTask launchDagTask) {
+ return new LaunchDagProc(launchDagTask);
+ }
+ //todo - overload meet method for other dag tasks
+}
+
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
new file mode 100644
index 000000000..bd6aba126
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+
+/**
+ * Responsible for polling {@link DagTask}s from {@link DagTaskStream} and
processing the
+ * {@link org.apache.gobblin.service.modules.flowgraph.Dag} based on the type
of {@link DagTask}.
+ * Each {@link DagTask} returned from the {@link DagTaskStream} comes with a
time-limited lease conferring the exclusive
+ * right to perform the work of the task.
+ * The {@link DagProcFactory} transforms each {@link DagTask} into a specific,
concrete {@link DagProc}, which
+ * encapsulates all processing inside {@link
DagProc#process(DagManagementStateStore)}
+ */
+
+@Alpha
+@Slf4j
+@Singleton
+public class DagProcessingEngine {
+
+ @Getter private final DagTaskStream dagTaskStream;
+ @Getter DagManagementStateStore dagManagementStateStore;
+
+ @Inject
+ public DagProcessingEngine(Config config, DagTaskStream dagTaskStream,
DagProcFactory dagProcFactory,
+ DagManagementStateStore dagManagementStateStore) {
+ Integer numThreads = ConfigUtils.getInt
+ (config, ServiceConfigKeys.NUM_DAG_PROC_THREADS_KEY,
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS);
+ ScheduledExecutorService scheduledExecutorPool =
+ Executors.newScheduledThreadPool(numThreads,
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("DagProcessingEngineThread")));
+ this.dagTaskStream = dagTaskStream;
+ this.dagManagementStateStore = dagManagementStateStore;
+
+ for (int i=0; i < numThreads; i++) {
+ // todo - set metrics for count of active DagProcEngineThread
+ DagProcEngineThread dagProcEngineThread = new
DagProcEngineThread(dagTaskStream, dagProcFactory, dagManagementStateStore);
+ scheduledExecutorPool.submit(dagProcEngineThread);
+ }
+ }
+
+ @AllArgsConstructor
+ @VisibleForTesting
+ static class DagProcEngineThread implements Runnable {
+ private DagTaskStream dagTaskStream;
+ private DagProcFactory dagProcFactory;
+ private DagManagementStateStore dagManagementStateStore;
+
+ @Override
+ public void run() {
+ while (true) {
+ DagTask dagTask = dagTaskStream.next(); // blocking call
+ if (dagTask == null) {
+ //todo - add a metrics to count the times dagTask was null
+ log.warn("Received a null dag task, ignoring.");
+ continue;
+ }
+ DagProc dagProc = dagTask.host(dagProcFactory);
+ try {
+ // todo - add retries
+ dagProc.process(dagManagementStateStore);
+ dagTask.conclude();
+ } catch (Exception e) {
+ log.error("DagProcEngineThread encountered exception while
processing dag " + dagTask.getDagId(), e);
+
DagManagementTaskStreamImpl.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
+ }
+ // todo mark lease success and releases it
+ //dagTaskStream.complete(dagTask);
+ }
+ }
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java
new file mode 100644
index 000000000..b8995fffe
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Iterator;
+
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * An interface to provide abstraction for getting next available {@link
DagTask} to process.
+ */
+
+public interface DagTaskStream extends Iterator<DagTask> {
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
new file mode 100644
index 000000000..0b27b4e9d
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+
+public interface DagTaskVisitor<T> {
+ T meet(LaunchDagTask launchDagTask);
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index d484ff8f8..48b789d11 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -23,15 +23,20 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.Lists;
+import com.google.inject.Inject;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -54,21 +59,50 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
// dagToJobs holds a map of dagId to running jobs of that dag
private final Map<DagManager.DagId,
LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new
ConcurrentHashMap<>();
private final Map<DagManager.DagId, Long> dagToDeadline = new
ConcurrentHashMap<>();
- private final DagStateStore dagStateStore;
- private final DagStateStore failedDagStateStore;
+ private DagStateStore dagStateStore;
+ private DagStateStore failedDagStateStore;
+ private boolean dagStoresInitialized = false;
private final UserQuotaManager quotaManager;
+ Map<URI, TopologySpec> topologySpecMap;
+ private final Config config;
private static final String FAILED_DAG_STATESTORE_PREFIX =
"failedDagStateStore";
public static final String DAG_STATESTORE_CLASS_KEY =
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+ FlowCatalog flowCatalog;
- public MostlyMySqlDagManagementStateStore(Config config, Map<URI,
TopologySpec> topologySpecMap) throws IOException {
- this.dagStateStore = createDagStateStore(config, topologySpecMap);
- this.failedDagStateStore = createDagStateStore(
- ConfigUtils.getConfigOrEmpty(config,
FAILED_DAG_STATESTORE_PREFIX).withFallback(config), topologySpecMap);
+ @Inject
+ public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog
flowCatalog) throws IOException {
this.quotaManager = new MysqlUserQuotaManager(config);
- this.quotaManager.init(getDags());
+ this.config = config;
+ this.flowCatalog = flowCatalog;
+ }
+
+ // It should be called after topology spec map is set
+ public synchronized void start() throws IOException {
+ if (!dagStoresInitialized) {
+ this.dagStateStore = createDagStateStore(config, topologySpecMap);
+ this.failedDagStateStore =
createDagStateStore(ConfigUtils.getConfigOrEmpty(config,
FAILED_DAG_STATESTORE_PREFIX).withFallback(config),
+ topologySpecMap);
+ initQuota(getDags());
+ dagStoresInitialized = true;
+ }
+ }
+
+ @Override
+ public FlowSpec getFlowSpec(URI uri) throws SpecNotFoundException {
+ return this.flowCatalog.getSpecs(uri);
+ }
+
+ @Override
+ public void removeFlowSpec(URI uri, Properties headers, boolean
triggerListener) {
+ this.flowCatalog.remove(uri, headers, triggerListener);
+ }
+
+ public synchronized void setTopologySpecMap(Map<URI, TopologySpec>
topologySpecMap) throws IOException {
+ this.topologySpecMap = topologySpecMap;
+ start();
}
- DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec>
topologySpecMap) {
+ private DagStateStore createDagStateStore(Config config, Map<URI,
TopologySpec> topologySpecMap) {
try {
Class<?> dagStateStoreClass =
Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY,
MysqlDagStateStore.class.getName()));
return (DagStateStore)
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config,
topologySpecMap);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
new file mode 100644
index 000000000..94e14387a
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible for performing the actual work for a given {@link DagTask} by
first initializing its state, performing
+ * actions based on the type of {@link DagTask} and finally submitting an
event to the executor.
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, T> {
+ protected static final MetricContext metricContext =
Instrumented.getMetricContext(new State(), DagProc.class);
+ protected static final EventSubmitter eventSubmitter = new
EventSubmitter.Builder(
+ metricContext, "org.apache.gobblin.service").build();
+
+ public final void process(DagManagementStateStore dagManagementStateStore)
throws IOException {
+ S state = initialize(dagManagementStateStore); // todo - retry
+ T result = act(dagManagementStateStore, state); // todo - retry
+ commit(dagManagementStateStore, result); // todo - retry
+ sendNotification(result, eventSubmitter); // todo - retry
+ }
+
+ protected abstract S initialize(DagManagementStateStore
dagManagementStateStore) throws IOException;
+
+ protected abstract T act(DagManagementStateStore dagManagementStateStore, S
state) throws IOException;
+
+ protected abstract void sendNotification(T result, EventSubmitter
eventSubmitter) throws IOException;
+
+ // todo - commit the modified dags to the persistent store, maybe not
required for InMem dagManagementStateStore
+ protected void commit(DagManagementStateStore dagManagementStateStore, T
result) {
+
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
new file mode 100644
index 000000000..72a59ee0e
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation for {@link LaunchDagTask}
+ */
+@Slf4j
+@Alpha
+public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>,
Optional<Dag<JobExecutionPlan>>> {
+ private final LaunchDagTask launchDagTask;
+ private final AtomicLong orchestrationDelayCounter;
+
+ public LaunchDagProc(LaunchDagTask launchDagTask) {
+ this.launchDagTask = launchDagTask;
+ this.orchestrationDelayCounter = new AtomicLong(0);
+ ContextAwareGauge<Long> orchestrationDelayMetric =
this.metricContext.newContextAwareGauge
+ (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get);
+ this.metricContext.register(orchestrationDelayMetric);
+ }
+
+ @Override
+ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
+ throws IOException {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+
+ @Override
+ protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+ throws IOException {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+
+ @Override
+ protected void sendNotification(Optional<Dag<JobExecutionPlan>> result,
EventSubmitter eventSubmitter)
+ throws IOException {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+
+ @Override
+ protected void commit(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag) {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
new file mode 100644
index 000000000..114864e64
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import java.io.IOException;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+
+
+/**
+ * Defines an individual task on a Dag.
+ * Upon successful completion of the corresponding {@link
DagProc#process(DagManagementStateStore)},
+ * {@link DagTask#conclude()} must be called.
+ */
+
+@Alpha
+public abstract class DagTask {
+ @Getter public final DagActionStore.DagAction dagAction;
+ private final MultiActiveLeaseArbiter.LeaseObtainedStatus
leaseObtainedStatus;
+ @Getter protected final DagManager.DagId dagId;
+
+ public DagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ this.dagAction = dagAction;
+ this.leaseObtainedStatus = leaseObtainedStatus;
+ this.dagId = DagManagerUtils.generateDagId(dagAction.getFlowGroup(),
dagAction.getFlowName(), dagAction.getFlowExecutionId());
+ }
+
+ public abstract <T> T host(DagTaskVisitor<T> visitor);
+
+ /**
+ * Any cleanup work, e.g. releasing lease if it was acquired earlier, may be
done in this method.
+ * Returns true if concluding dag task finished successfully otherwise false.
+ */
+ // todo call it from the right place
+ public abstract boolean conclude() throws IOException;
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
new file mode 100644
index 000000000..5093eaf3e
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.task;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+
+
+/**
+ * A {@link DagTask} responsible to handle launch tasks.
+ */
+
+public class LaunchDagTask extends DagTask {
+ public LaunchDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ super(dagAction, leaseObtainedStatus);
+ }
+
+ public <T> T host(DagTaskVisitor<T> visitor) {
+ return visitor.meet(this);
+ }
+
+ @Override
+ public boolean conclude() {
+ // todo - release lease
+ return true;
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index a6f11e08f..7260e45d2 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -17,6 +17,13 @@
package org.apache.gobblin.service.monitoring;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
@@ -24,16 +31,12 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
@@ -59,13 +62,13 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX =
"dagActionChangeStore";
// Metrics
- private ContextAwareMeter killsInvoked;
- private ContextAwareMeter resumesInvoked;
+ protected ContextAwareMeter killsInvoked;
+ protected ContextAwareMeter resumesInvoked;
private ContextAwareMeter successfulLaunchSubmissions;
private ContextAwareMeter failedLaunchSubmissions;
private ContextAwareMeter successfulLaunchSubmissionsOnStartup;
private ContextAwareMeter failedLaunchSubmissionsOnStartup;
- private ContextAwareMeter unexpectedErrors;
+ protected ContextAwareMeter unexpectedErrors;
private ContextAwareMeter messageProcessedMeter;
private ContextAwareMeter duplicateMessagesMeter;
private ContextAwareMeter heartbeatMessagesMeter;
@@ -230,7 +233,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
* For a given dagAction, calls the appropriate method in the DagManager to
carry out the desired action.
* @param isStartup true if called for dagAction loaded directly from store
upon startup, false otherwise
*/
- private void handleDagAction(DagActionStore.DagAction dagAction, boolean
isStartup) {
+ protected void handleDagAction(DagActionStore.DagAction dagAction, boolean
isStartup) {
log.info("(" + (isStartup ? "on-startup" : "post-startup") + ") DagAction
change ({}) received for flow: {}",
dagAction.getFlowActionType(), dagAction);
if
(dagAction.getFlowActionType().equals(DagActionStore.FlowActionType.RESUME)) {
@@ -342,7 +345,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
@Data
@AllArgsConstructor
- private static class LaunchSubmissionMetricProxy {
+ protected static class LaunchSubmissionMetricProxy {
private ContextAwareMeter successMeter;
private ContextAwareMeter failureMeter;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
new file mode 100644
index 000000000..6bfc92045
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+
+
+/**
+ * A {@link DagActionStoreChangeMonitor} that should be used {@link
org.apache.gobblin.service.ServiceConfigKeys#DAG_PROCESSING_ENGINE_ENABLED}
+ * is set.
+ */
+@Slf4j
+public class DagManagementDagActionStoreChangeMonitor extends
DagActionStoreChangeMonitor {
+ private final DagManagement dagManagement;
+ protected ContextAwareMeter unexpectedLaunchEventErrors;
+
+ // Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
+ // client itself to determine all Kafka related information dynamically
rather than through the config.
+ public DagManagementDagActionStoreChangeMonitor(Config config, int
numThreads,
+ FlowCatalog flowCatalog, Orchestrator orchestrator, DagActionStore
dagActionStore,
+ boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement) {
+ // DagManager is only needed in the `handleDagAction` method of its parent
class and not needed in this class,
+ // so we are passing a null value for DagManager to its parent class.
+ super("", config, null, numThreads, flowCatalog, orchestrator,
dagActionStore, isMultiActiveSchedulerEnabled);
+ this.dagManagement = dagManagement;
+ }
+
+ /**
+ * This implementation passes on the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction} to the
+ * {@link DagManagement} instead of finding a {@link
org.apache.gobblin.runtime.api.FlowSpec} and passing the spec to {@link
Orchestrator}.
+ */
+ @Override
+ protected void handleDagAction(DagActionStore.DagAction dagAction, boolean
isStartup) {
+ log.info("(" + (isStartup ? "on-startup" : "post-startup") + ") DagAction
change ({}) received for flow: {}",
+ dagAction.getFlowActionType(), dagAction);
+ LaunchSubmissionMetricProxy launchSubmissionMetricProxy = isStartup ?
ON_STARTUP : POST_STARTUP;
+ try {
+ // todo - add actions for other other type of dag actions
+ if
(dagAction.getFlowActionType().equals(DagActionStore.FlowActionType.LAUNCH)) {
+ // If multi-active scheduler is NOT turned on we should not receive
these type of events
+ if (!this.isMultiActiveSchedulerEnabled) {
+ this.unexpectedLaunchEventErrors.mark();
+ throw new RuntimeException(String.format("Received LAUNCH dagAction
while not in multi-active scheduler "
+ + "mode for flowAction: %s", dagAction));
+ }
+ dagManagement.addDagAction(dagAction);
+ } else {
+ log.warn("Received unsupported dagAction {}. Expected to be a KILL,
RESUME, or LAUNCH", dagAction.getFlowActionType());
+ this.unexpectedErrors.mark();
+ }
+ } catch (IOException e) {
+ log.warn("Failed to addDagAction for flowId {} due to exception {}",
dagAction.getFlowId(), e.getMessage());
+ launchSubmissionMetricProxy.markFailure();
+ }
+ }
+
+ @Override
+ protected void createMetrics() {
+ super.createMetrics();
+ // Dag Action specific metrics
+ this.unexpectedLaunchEventErrors =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagProcEngineEnabledDagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagProcEngineEnabledDagActionStoreChangeMonitorFactory.java
new file mode 100644
index 000000000..fbf6bc336
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagProcEngineEnabledDagActionStoreChangeMonitorFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.util.Objects;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.inject.Provider;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A factory implementation that returns a {@link
DagManagementDagActionStoreChangeMonitor} instance.
+ */
+@Slf4j
+public class DagProcEngineEnabledDagActionStoreChangeMonitorFactory implements
Provider<DagActionStoreChangeMonitor> {
+ static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY =
"numThreads";
+
+ private final Config config;
+ private final FlowCatalog flowCatalog;
+ private final Orchestrator orchestrator;
+ private final DagActionStore dagActionStore;
+ private final boolean isMultiActiveSchedulerEnabled;
+ private final DagManagement dagManagement;
+
+ @Inject
+ public DagProcEngineEnabledDagActionStoreChangeMonitorFactory(Config config,
DagManager dagManager, FlowCatalog flowCatalog,
+ Orchestrator orchestrator, DagActionStore dagActionStore, DagManagement
dagManagement,
+ @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean
isMultiActiveSchedulerEnabled) {
+ this.config = Objects.requireNonNull(config);
+ this.flowCatalog = flowCatalog;
+ this.orchestrator = orchestrator;
+ this.dagActionStore = dagActionStore;
+ this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
+ this.dagManagement = dagManagement;
+ }
+
+ private DagManagementDagActionStoreChangeMonitor
createDagActionStoreMonitor()
+ throws ReflectiveOperationException {
+ Config dagActionStoreChangeConfig =
this.config.getConfig(DagActionStoreChangeMonitor.DAG_ACTION_CHANGE_MONITOR_PREFIX);
+ log.info("DagActionStore will be initialized with config {}",
dagActionStoreChangeConfig);
+
+ int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig,
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
+
+ return new
DagManagementDagActionStoreChangeMonitor(dagActionStoreChangeConfig,
+ numThreads, flowCatalog, orchestrator, dagActionStore,
isMultiActiveSchedulerEnabled, this.dagManagement);
+ }
+
+ @Override
+ public DagActionStoreChangeMonitor get() {
+ try {
+ DagActionStoreChangeMonitor changeMonitor =
createDagActionStoreMonitor();
+ changeMonitor.initializeMonitor();
+ return changeMonitor;
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException("Failed to initialize DagActionStoreMonitor
due to ", e);
+ }
+ }
+}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
new file mode 100644
index 000000000..c9998f046
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.junit.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
+
+public class DagManagementTaskStreamImplTest {
+ private static final String TEST_USER = "testUser";
+ private static final String TEST_PASSWORD = "testPassword";
+ private static final String TEST_TABLE = "quotas";
+ static ITestMetastoreDatabase testMetastoreDatabase;
+ DagProcessingEngine.DagProcEngineThread dagProcEngineThread;
+ DagManagementTaskStreamImpl dagManagementTaskStream;
+ DagProcFactory dagProcFactory;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ // Setting up mock DB
+ testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+ Config config;
+ ConfigBuilder configBuilder = ConfigBuilder.create();
+
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
testMetastoreDatabase.getJdbcUrl())
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE);
+ config = configBuilder.build();
+
+ // Constructing TopologySpecMap.
+ Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+ String specExecInstance = "mySpecExecutor";
+ TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+ URI specExecURI = new URI(specExecInstance);
+ topologySpecMap.put(specExecURI, topologySpec);
+ MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null);
+ dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+ this.dagManagementTaskStream =
+ new DagManagementTaskStreamImpl(config, Optional.empty());
+ this.dagProcFactory = new DagProcFactory();
+ this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
+ this.dagManagementTaskStream, this.dagProcFactory,
dagManagementStateStore);
+ }
+
+ // This tests adding and removal of dag actions from dag task stream
+ // when we have different dag procs in future, we can test dag processing
and exception handling
+ @Test
+ public void addRemoveDagActions() throws IOException {
+ dagManagementTaskStream.addDagAction(
+ new DagActionStore.DagAction("fg", "fn", "12345",
DagActionStore.FlowActionType.LAUNCH));
+ DagTask dagTask = dagManagementTaskStream.next();
+ Assert.assertTrue(dagTask instanceof LaunchDagTask);
+ DagProc dagProc = dagTask.host(this.dagProcFactory);
+ Assert.assertNotNull(dagProc);
+ }
+}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
new file mode 100644
index 000000000..11a74d7db
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.concurrent.TimeoutException;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+import static org.mockito.Mockito.spy;
+
+@Slf4j
+public class DagProcessingEngineTest {
+ private static final String TEST_USER = "testUser";
+ private static final String TEST_PASSWORD = "testPassword";
+ private static final String TEST_TABLE = "quotas";
+ static ITestMetastoreDatabase testMetastoreDatabase;
+ DagProcessingEngine.DagProcEngineThread dagProcEngineThread;
+ DagManagementTaskStreamImpl dagManagementTaskStream;
+ DagProcessingEngine dagProcessingEngine;
+ DagTaskStream dagTaskStream;
+ DagProcFactory dagProcFactory;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ // Setting up mock DB
+ testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+
+ Config config;
+ ConfigBuilder configBuilder = ConfigBuilder.create();
+
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
testMetastoreDatabase.getJdbcUrl())
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE);
+ config = configBuilder.build();
+
+ // Constructing TopologySpecMap.
+ Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
+ String specExecInstance = "mySpecExecutor";
+ TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
+ URI specExecURI = new URI(specExecInstance);
+ topologySpecMap.put(specExecURI, topologySpec);
+ MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null);
+ dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+ this.dagManagementTaskStream =
+ new DagManagementTaskStreamImpl(config, Optional.empty());
+ this.dagProcFactory = new DagProcFactory();
+ this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
+ this.dagManagementTaskStream, this.dagProcFactory,
dagManagementStateStore);
+ this.dagTaskStream = spy(new MockedDagTaskStream());
+ this.dagProcessingEngine =
+ new DagProcessingEngine(config, dagTaskStream, this.dagProcFactory,
dagManagementStateStore);
+ }
+
+ static class MockedDagTaskStream implements DagTaskStream {
+ public static final int MAX_NUM_OF_TASKS = 1000;
+ public static final int FAILING_DAGS_FREQUENCY = 10;
+ volatile int i=0;
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+ @Override
+ public synchronized DagTask next() throws NoSuchElementException {
+ i++;
+ if (i > MAX_NUM_OF_TASKS) {
+ throw new RuntimeException("Simulating an exception to stop the
thread!");
+ }
+ if (i % FAILING_DAGS_FREQUENCY == 0 ) {
+ return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-"
+ i, "1234" + i, DagActionStore.FlowActionType.LAUNCH), true);
+ } else {
+ return new MockedDagTask(new DagActionStore.DagAction("fg-" + i, "fn-"
+ i, "1234" + i, DagActionStore.FlowActionType.LAUNCH), false);
+ }
+ }
+ }
+
+ static class MockedDagTask extends DagTask {
+ private final boolean isBad;
+
+ public MockedDagTask(DagActionStore.DagAction dagAction, boolean isBad) {
+ super(dagAction, null);
+ this.isBad = isBad;
+ }
+
+ @Override
+ public <T> T host(DagTaskVisitor<T> visitor) {
+ return (T) new MockedDagProc(this.isBad);
+ }
+
+ @Override
+ public boolean conclude() {
+ return false;
+ }
+ }
+
+ static class MockedDagProc extends DagProc<Void, Void> {
+ private final boolean isBad;
+ public MockedDagProc(boolean isBad) {
+ this.isBad = isBad;
+ }
+
+ @Override
+ protected Void initialize(DagManagementStateStore dagManagementStateStore)
{
+ return null;
+ }
+
+ @Override
+ protected Void act(DagManagementStateStore dagManagementStateStore, Void
state) {
+ if (this.isBad) {
+ throw new RuntimeException("Simulating an exception!");
+ }
+ return null;
+ }
+
+ @Override
+ protected void sendNotification(Void result, EventSubmitter
eventSubmitter) {
+ }
+
+ @Override
+ protected void commit(DagManagementStateStore dagManagementStateStore,
Void result) {
+ }
+ }
+
+ // This tests verifies that all the dag tasks entered to the dag task stream
are retrieved by dag proc engine threads
+ @Test
+ public void dagProcessingTest() throws InterruptedException,
TimeoutException {
+ // there are MAX_NUM_OF_TASKS dag tasks returned and then each thread
additionally call (infinitely) once to wait
+ // in this unit tests, it does not infinitely wait though, because the
mocked task stream throws an exception on
+ // (MAX_NUM_OF_TASKS + 1) th call
+ int expectedNumOfInvocations = MockedDagTaskStream.MAX_NUM_OF_TASKS +
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS;
+ int expectedExceptions = MockedDagTaskStream.MAX_NUM_OF_TASKS /
MockedDagTaskStream.FAILING_DAGS_FREQUENCY;
+
+ AssertWithBackoff.assertTrue(input ->
Mockito.mockingDetails(this.dagTaskStream).getInvocations().size() ==
expectedNumOfInvocations,
+ 10000L, "dagTaskStream was not called " + expectedNumOfInvocations + "
number of times",
+ log, 1, 1000L);
+
+
Assert.assertEquals(DagManagementTaskStreamImpl.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(),
expectedExceptions);
+ }
+}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index 4e1b914d1..0a8e421df 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -47,7 +47,7 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
*/
public class MostlyMySqlDagManagementStateStoreTest {
- private DagManagementStateStore dagManagementStateStore;
+ private MostlyMySqlDagManagementStateStore dagManagementStateStore;
private static final String TEST_USER = "testUser";
private static final String TEST_PASSWORD = "testPassword";
private static final String TEST_DAG_STATE_STORE = "TestDagStateStore";
@@ -75,7 +75,9 @@ public class MostlyMySqlDagManagementStateStoreTest {
TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
URI specExecURI = new URI(specExecInstance);
topologySpecMap.put(specExecURI, topologySpec);
- this.dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, topologySpecMap);
+ this.dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null);
+ this.dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+ this.dagManagementStateStore.start();
}
@Test
@@ -128,8 +130,7 @@ public class MostlyMySqlDagManagementStateStoreTest {
@Override
protected StateStore<State> createStateStore(Config config) {
try {
-
- String jdbcUrl =
MostlyMySqlDagManagementStateStoreTest.testMetastoreDatabase.getJdbcUrl();
+ String jdbcUrl =
config.getString(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY));
HikariDataSource dataSource = new HikariDataSource();
dataSource.setDriverClassName(ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER);