This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 54f87cb72 [GOBBLIN-2029 ]Codestyle changes in DagProc and related
classes (#3906)
54f87cb72 is described below
commit 54f87cb72172b2ecf119ea5818a6a055ada77e97
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Fri Mar 29 16:26:51 2024 -0700
[GOBBLIN-2029 ]Codestyle changes in DagProc and related classes (#3906)
* some codestyle changes
* address review comments
---
.../runtime/DagActionStoreChangeMonitorTest.java | 12 ++-
.../runtime/scheduler/QuartzJobSpecScheduler.java | 4 +-
.../spec_executorInstance/MockedSpecExecutor.java | 2 +-
.../modules/core/GobblinServiceGuiceModule.java | 2 +-
.../orchestration/DagManagementStateStore.java | 6 ++
.../orchestration/DagManagementTaskStreamImpl.java | 2 +-
.../service/modules/orchestration/DagManager.java | 5 +
.../modules/orchestration/DagProcessingEngine.java | 2 +-
.../MostlyMySqlDagManagementStateStore.java | 5 +
.../MysqlMultiActiveLeaseArbiter.java | 2 +-
.../modules/orchestration/proc/DagProc.java | 38 +++----
.../proc/{LaunchDagProc.java => DagProcUtils.java} | 109 +++-----------------
.../modules/orchestration/proc/LaunchDagProc.java | 113 +++------------------
.../modules/orchestration/task/DagTask.java | 4 -
...lowExecutionResourceHandlerWithWarmStandby.java | 14 ++-
.../modules/spec/JobExecutionPlanDagFactory.java | 2 +-
.../spec/JobExecutionPlanListDeserializer.java | 1 -
.../monitoring/DagActionStoreChangeMonitor.java | 2 +-
.../DagActionStoreChangeMonitorFactory.java | 15 +--
.../DagManagementDagActionStoreChangeMonitor.java | 30 ++----
...nagementDagActionStoreChangeMonitorFactory.java | 15 +--
.../service/monitoring/KafkaJobStatusMonitor.java | 13 ++-
.../modules/orchestration/DagManagerTest.java | 5 +
.../orchestration/DagProcessingEngineTest.java | 21 ++--
.../orchestration/FlowLaunchHandlerTest.java | 6 +-
.../MostlyMySqlDagManagementStateStoreTest.java | 3 +-
.../orchestration/proc/LaunchDagProcTest.java | 2 +-
27 files changed, 135 insertions(+), 300 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 78083c065..0575a99bd 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -17,17 +17,23 @@
package org.apache.gobblin.runtime;
+import java.net.URI;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.testng.annotations.Test;
+
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
-import java.net.URI;
+
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
@@ -35,8 +41,6 @@ import
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
import org.apache.gobblin.service.monitoring.DagActionValue;
import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent;
import org.apache.gobblin.service.monitoring.OperationType;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.testng.annotations.Test;
import static org.mockito.Mockito.*;
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/scheduler/QuartzJobSpecScheduler.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/scheduler/QuartzJobSpecScheduler.java
index 4e46cee70..a3deb4157 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/scheduler/QuartzJobSpecScheduler.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/scheduler/QuartzJobSpecScheduler.java
@@ -42,6 +42,8 @@ import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import lombok.Data;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.api.JobSpec;
@@ -50,8 +52,6 @@ import org.apache.gobblin.runtime.api.JobSpecScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.util.service.StandardServiceConfig;
-import lombok.Data;
-
/**
* A {@link JobSpecScheduler} using Quartz.
*/
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
index 7c44b9833..378d04717 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_executorInstance/MockedSpecExecutor.java
@@ -45,7 +45,7 @@ public class MockedSpecExecutor extends InMemorySpecExecutor {
when(mockedSpecProducer.addSpec(any())).thenReturn(new
CompletedFuture(Boolean.TRUE, null));
when(mockedSpecProducer.serializeAddSpecResponse(any())).thenReturn("");
when(mockedSpecProducer.deserializeAddSpecResponse(any())).thenReturn(new
CompletedFuture(Boolean.TRUE, null));
- }
+ }
public static SpecExecutor createDummySpecExecutor(URI uri) {
Properties properties = new Properties();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index ed179fa58..c2af7cb1a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -70,6 +70,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
import org.apache.gobblin.service.modules.db.ServiceDatabaseProvider;
import org.apache.gobblin.service.modules.db.ServiceDatabaseProviderImpl;
+import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
import org.apache.gobblin.service.modules.orchestration.DagManagement;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementTaskStreamImpl;
@@ -78,7 +79,6 @@ import
org.apache.gobblin.service.modules.orchestration.DagProcFactory;
import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
import org.apache.gobblin.service.modules.orchestration.DagTaskStream;
import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
-import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
import org.apache.gobblin.service.modules.orchestration.FlowLaunchHandler;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index 86df6bee8..274abeaa0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -194,4 +194,10 @@ public interface DagManagementStateStore {
* @return {@link JobStatus} or {@link Optional#empty} if not present in the
Job-Status Store
*/
Optional<JobStatus> getJobStatus(DagNodeId dagNodeId);
+
+ /**
+ * Returns true if the {@link Dag} identified by the given {@link
org.apache.gobblin.service.modules.orchestration.DagManager.DagId}
+ * has any running job, false otherwise.
+ */
+ public boolean hasRunningJobs(DagManager.DagId dagId);
}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 16d396394..4409a673f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -162,7 +162,7 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
case LAUNCH:
return new LaunchDagTask(dagAction, leaseObtainedStatus,
dagActionStore.get());
default:
- throw new UnsupportedOperationException("Not yet implemented");
+ throw new UnsupportedOperationException(dagActionType + " not yet
implemented");
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 7fd90a244..218fe8128 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -172,6 +172,7 @@ public class DagManager extends AbstractIdleService {
String flowGroup;
String flowName;
String flowExecutionId;
+
public DagId(String flowGroup, String flowName, String flowExecutionId) {
this.flowGroup = flowGroup;
this.flowName = flowName;
@@ -186,6 +187,10 @@ public class DagManager extends AbstractIdleService {
DagActionStore.DagAction toDagAction(DagActionStore.DagActionType
actionType) {
return new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, NO_JOB_NAME_DEFAULT, actionType);
}
+
+ public FlowId getFlowId() {
+ return new
FlowId().setFlowGroup(this.flowGroup).setFlowName(this.flowName);
+ }
}
private final BlockingQueue<Dag<JobExecutionPlan>>[] runQueue;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
index aa4a5f0e0..cbcfa3704 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -97,7 +97,7 @@ public class DagProcessingEngine {
dagProc.process(dagManagementStateStore);
dagTask.conclude();
} catch (Exception e) {
- log.error("DagProcEngineThread encountered exception while
processing dag " + dagTask.getDagId(), e);
+ log.error("DagProcEngineThread encountered exception while
processing dag " + dagProc.getDagId(), e);
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
}
// todo mark lease success and releases it
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index 029dd35b8..5304c8003 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -263,4 +263,9 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
return java.util.Optional.empty();
}
}
+
+ @Override
+ public boolean hasRunningJobs(DagManager.DagId dagId) {
+ return !getDagNodes(dagId).isEmpty();
+ }
}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 9dfea5acc..113027b24 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -490,7 +490,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
/**
* Parse result of attempted insert/update to obtain a lease for a
- * {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} event by
selecting values corresponding to that
+ * {@link DagActionStore.DagAction} event by selecting values corresponding
to that
* event from the table to return the corresponding status based on
successful insert/update or not.
* @throws SQLException
* @throws IOException
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
index 6a5943445..4f0a254b5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
-import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -29,41 +28,44 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
/**
* Responsible for performing the actual work for a given {@link DagTask} by
first initializing its state, performing
- * actions based on the type of {@link DagTask} and finally submitting an
event to the executor.
+ * actions based on the type of {@link DagTask}. Submitting events in time is
found to be important in
+ * <a href="https://github.com/apache/gobblin/pull/3641">PR#3641</a>, hence
initialize and act methods submit events as
+ * they happen.
*/
@Alpha
@Data
@Slf4j
-public abstract class DagProc<S, T> {
+public abstract class DagProc<T> {
+ protected final DagTask dagTask;
+ @Getter protected final DagManager.DagId dagId;
+ @Getter protected final DagNodeId dagNodeId;
protected static final MetricContext metricContext =
Instrumented.getMetricContext(new State(), DagProc.class);
protected static final EventSubmitter eventSubmitter = new
EventSubmitter.Builder(
metricContext, "org.apache.gobblin.service").build();
- @Getter(AccessLevel.PROTECTED)
- private final DagTask dagTask;
- public final void process(DagManagementStateStore dagManagementStateStore)
throws IOException {
- S state = initialize(dagManagementStateStore); // todo - retry
- T result = act(dagManagementStateStore, state); // todo - retry
- sendNotification(result, eventSubmitter); // todo - retry
- log.info("{} successfully concluded actions for dagId : {}",
getClass().getSimpleName(), getDagId());
+ public DagProc(DagTask dagTask) {
+ this.dagTask = dagTask;
+ this.dagId =
DagManagerUtils.generateDagId(this.dagTask.getDagAction().getFlowGroup(),
+ this.dagTask.getDagAction().getFlowName(),
this.dagTask.getDagAction().getFlowExecutionId());
+ this.dagNodeId = this.dagTask.getDagAction().getDagNodeId();
}
- protected DagManager.DagId getDagId() {
- return this.dagTask.getDagId();
+ public final void process(DagManagementStateStore dagManagementStateStore)
throws IOException {
+ T state = initialize(dagManagementStateStore); // todo - retry
+ act(dagManagementStateStore, state); // todo - retry
+ log.info("{} concluded processing for dagId : {}",
getClass().getSimpleName(), this.dagId);
}
- protected abstract S initialize(DagManagementStateStore
dagManagementStateStore) throws IOException;
-
- protected abstract T act(DagManagementStateStore dagManagementStateStore, S
state) throws IOException;
-
- protected abstract void sendNotification(T result, EventSubmitter
eventSubmitter) throws IOException;
+ protected abstract T initialize(DagManagementStateStore
dagManagementStateStore) throws IOException;
- // todo - commit the modified dags to the persistent store, maybe not
required for InMem dagManagementStateStore
+ protected abstract void act(DagManagementStateStore dagManagementStateStore,
T state) throws IOException;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
similarity index 53%
copy from
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
copy to
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 6368d8887..9c87bb029 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -18,122 +18,43 @@
package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
-import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
-import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
/**
- * An implementation for {@link DagProc} that launches a new job.
+ * A class to group together all the utility methods for {@link DagProc}
derived class implementations.
*/
@Slf4j
-public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>,
Optional<Dag<JobExecutionPlan>>> {
- private final FlowCompilationValidationHelper
flowCompilationValidationHelper;
- // todo - this is not orchestration delay and should be renamed. keeping it
the same because DagManager is also using
- // the same name
- private static final AtomicLong orchestrationDelayCounter = new
AtomicLong(0);
-
- public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper) {
- super(launchDagTask);
- this.flowCompilationValidationHelper = flowCompilationValidationHelper;
- }
-
- static {
- metricContext.register(
-
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get));
- }
-
- @Override
- protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
- throws IOException {
- try {
- DagActionStore.DagAction dagAction = this.getDagTask().getDagAction();
- FlowSpec flowSpec = loadFlowSpec(dagManagementStateStore, dagAction);
- flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
- return
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
- } catch (URISyntaxException | SpecNotFoundException | InterruptedException
e) {
- throw new RuntimeException(e);
- }
- }
-
- private FlowSpec loadFlowSpec(DagManagementStateStore
dagManagementStateStore, DagActionStore.DagAction dagAction)
- throws URISyntaxException, SpecNotFoundException {
- URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
- return dagManagementStateStore.getFlowSpec(flowUri);
- }
-
- @Override
- protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
- throws IOException {
- if (!dag.isPresent()) {
- log.warn("Dag with id " + getDagId() + " could not be compiled.");
- // todo - add metrics
- return Optional.empty();
- }
- submitNextNodes(dagManagementStateStore, dag.get());
- orchestrationDelayCounter.set(System.currentTimeMillis() -
DagManagerUtils.getFlowExecId(dag.get()));
- return dag;
- }
-
- /**
- * Submit next set of Dag nodes in the provided Dag.
- */
- private void submitNextNodes(DagManagementStateStore
dagManagementStateStore,
- Dag<JobExecutionPlan> dag) throws IOException {
- Set<Dag.DagNode<JobExecutionPlan>> nextNodes =
DagManagerUtils.getNext(dag);
-
- if (nextNodes.size() > 1) {
- handleMultipleJobs(nextNodes);
- }
-
- //Submit jobs from the dag ready for execution.
- for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
- submitJobToExecutor(dagManagementStateStore, dagNode);
- dagManagementStateStore.addDagNodeState(dagNode, getDagId());
- log.info("Submitted job {} for dagId {}",
DagManagerUtils.getJobName(dagNode), getDagId());
- }
-
- //Checkpoint the dag state, it should have an updated value of dag nodes
- dagManagementStateStore.checkpointDag(dag);
- }
-
- private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>>
nextNodes) {
- throw new UnsupportedOperationException("More than one start job is not
allowed");
- }
+public class DagProcUtils {
/**
- * Submits a {@link JobSpec} to a {@link SpecExecutor}.
+ * - submits a {@link JobSpec} to a {@link SpecExecutor}
+ * - emits a {@link TimingEvent.LauncherTimings#JOB_ORCHESTRATED} {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}
+ * that measures the time needed to submit the job to {@link SpecExecutor}
+ * - increment running jobs counter for the {@link Dag}, the proxy user that
submitted the job and the {@link SpecExecutor} job was sent to
+ * - add updated dag node state to dagManagementStateStore
*/
- private void submitJobToExecutor(DagManagementStateStore
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode) {
+ public static void submitJobToExecutor(DagManagementStateStore
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode,
+ DagManager.DagId dagId) {
DagManagerUtils.incrementJobAttempt(dagNode);
JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
@@ -145,7 +66,8 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>, Opti
SpecProducer<Spec> producer;
try {
producer = DagManagerUtils.getSpecProducer(dagNode);
- TimingEvent jobOrchestrationTimer =
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED);
+ // todo - submits an event with some other name, because it is not
really orchestration happening here
+ TimingEvent jobOrchestrationTimer =
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED);
// Increment job count before submitting the job onto the spec producer,
in case that throws an exception.
// By this point the quota is allocated, so it's imperative to increment
as missing would introduce the potential to decrement below zero upon quota
release.
@@ -171,8 +93,9 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>, Opti
jobOrchestrationTimer.stop(jobMetadata);
log.info("Orchestrated job: {} on Executor: {}",
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
+ dagManagementStateStore.addDagNodeState(dagNode, dagId);
} catch (Exception e) {
- TimingEvent jobFailedTimer =
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
+ TimingEvent jobFailedTimer =
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
String message = "Cannot submit job " +
DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " +
specExecutorUri;
log.error(message, e);
jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " +
e.getMessage());
@@ -188,8 +111,4 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>, Opti
throw new RuntimeException(e);
}
}
-
- @Override
- protected void sendNotification(Optional<Dag<JobExecutionPlan>> result,
EventSubmitter eventSubmitter) {
- }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index 6368d8887..4cef3b7a7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -18,35 +18,20 @@
package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
-import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.collect.Maps;
-
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.api.SpecProducer;
-import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
-import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
@@ -56,52 +41,44 @@ import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
* An implementation for {@link DagProc} that launches a new job.
*/
@Slf4j
-public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>,
Optional<Dag<JobExecutionPlan>>> {
+public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>> {
private final FlowCompilationValidationHelper
flowCompilationValidationHelper;
// todo - this is not orchestration delay and should be renamed. keeping it
the same because DagManager is also using
// the same name
private static final AtomicLong orchestrationDelayCounter = new
AtomicLong(0);
- public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper) {
- super(launchDagTask);
- this.flowCompilationValidationHelper = flowCompilationValidationHelper;
- }
-
static {
metricContext.register(
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get));
}
+ public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper) {
+ super(launchDagTask);
+ this.flowCompilationValidationHelper = flowCompilationValidationHelper;
+ }
+
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
throws IOException {
try {
- DagActionStore.DagAction dagAction = this.getDagTask().getDagAction();
- FlowSpec flowSpec = loadFlowSpec(dagManagementStateStore, dagAction);
- flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
+ FlowSpec flowSpec =
dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(getDagId().getFlowId()));
+ flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
getDagId().getFlowExecutionId());
return
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
} catch (URISyntaxException | SpecNotFoundException | InterruptedException
e) {
throw new RuntimeException(e);
}
}
- private FlowSpec loadFlowSpec(DagManagementStateStore
dagManagementStateStore, DagActionStore.DagAction dagAction)
- throws URISyntaxException, SpecNotFoundException {
- URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
- return dagManagementStateStore.getFlowSpec(flowUri);
- }
-
@Override
- protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore
dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
+ protected void act(DagManagementStateStore dagManagementStateStore,
Optional<Dag<JobExecutionPlan>> dag)
throws IOException {
if (!dag.isPresent()) {
log.warn("Dag with id " + getDagId() + " could not be compiled.");
// todo - add metrics
- return Optional.empty();
+ } else {
+ submitNextNodes(dagManagementStateStore, dag.get());
+ orchestrationDelayCounter.set(System.currentTimeMillis() -
DagManagerUtils.getFlowExecId(dag.get()));
}
- submitNextNodes(dagManagementStateStore, dag.get());
- orchestrationDelayCounter.set(System.currentTimeMillis() -
DagManagerUtils.getFlowExecId(dag.get()));
- return dag;
}
/**
@@ -117,8 +94,7 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>, Opti
//Submit jobs from the dag ready for execution.
for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
- submitJobToExecutor(dagManagementStateStore, dagNode);
- dagManagementStateStore.addDagNodeState(dagNode, getDagId());
+ DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode,
getDagId());
log.info("Submitted job {} for dagId {}",
DagManagerUtils.getJobName(dagNode), getDagId());
}
@@ -129,67 +105,4 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>, Opti
private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>>
nextNodes) {
throw new UnsupportedOperationException("More than one start job is not
allowed");
}
-
- /**
- * Submits a {@link JobSpec} to a {@link SpecExecutor}.
- */
- private void submitJobToExecutor(DagManagementStateStore
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode) {
- DagManagerUtils.incrementJobAttempt(dagNode);
- JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
- JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
- Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
-
- String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
-
- // Run this spec on selected executor
- SpecProducer<Spec> producer;
- try {
- producer = DagManagerUtils.getSpecProducer(dagNode);
- TimingEvent jobOrchestrationTimer =
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED);
-
- // Increment job count before submitting the job onto the spec producer,
in case that throws an exception.
- // By this point the quota is allocated, so it's imperative to increment
as missing would introduce the potential to decrement below zero upon quota
release.
- // Quota release is guaranteed, despite failure, because exception
handling within would mark the job FAILED.
- // When the ensuing kafka message spurs DagManager processing, the quota
is released and the counts decremented
- // Ensure that we do not double increment for flows that are retried
- if (DagManagerUtils.getJobExecutionPlan(dagNode).getCurrentAttempts() ==
1) {
-
dagManagementStateStore.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);
- }
- // Submit the job to the SpecProducer, which in turn performs the actual
job submission to the SpecExecutor instance.
- // The SpecProducer implementations submit the job to the underlying
executor and return when the submission is complete,
- // either successfully or unsuccessfully. To catch any exceptions in the
job submission, the DagManagerThread
- // blocks (by calling Future#get()) until the submission is completed.
- dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode));
- Future<?> addSpecFuture = producer.addSpec(jobSpec);
- // todo - we should add future.get() instead of the complete future into
the JobExecutionPlan
-
dagNode.getValue().setJobFuture(com.google.common.base.Optional.of(addSpecFuture));
- addSpecFuture.get();
- jobExecutionPlan.setExecutionStatus(ExecutionStatus.ORCHESTRATED);
- jobMetadata.put(TimingEvent.METADATA_MESSAGE,
producer.getExecutionLink(addSpecFuture, specExecutorUri));
- // Add serialized job properties as part of the orchestrated job event
metadata
- jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
dagNode.getValue().toString());
- jobOrchestrationTimer.stop(jobMetadata);
- log.info("Orchestrated job: {} on Executor: {}",
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
-
dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
- } catch (Exception e) {
- TimingEvent jobFailedTimer =
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
- String message = "Cannot submit job " +
DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " +
specExecutorUri;
- log.error(message, e);
- jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " +
e.getMessage());
- if (jobFailedTimer != null) {
- jobFailedTimer.stop(jobMetadata);
- }
- try {
- // when there is no exception, quota will be released in job status
monitor or re-evaluate dag proc
- dagManagementStateStore.releaseQuota(dagNode);
- } catch (IOException ex) {
- log.error("Could not release quota while handling e", ex);
- }
- throw new RuntimeException(e);
- }
- }
-
- @Override
- protected void sendNotification(Optional<Dag<JobExecutionPlan>> result,
EventSubmitter eventSubmitter) {
- }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index 732b9a226..bc12926c7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -25,8 +25,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
-import org.apache.gobblin.service.modules.orchestration.DagManager;
-import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
@@ -44,14 +42,12 @@ public abstract class DagTask {
@Getter public final DagActionStore.DagAction dagAction;
private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
private final DagActionStore dagActionStore;
- @Getter protected final DagManager.DagId dagId;
public DagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
DagActionStore dagActionStore) {
this.dagAction = dagAction;
this.leaseObtainedStatus = leaseObtainedStatus;
this.dagActionStore = dagActionStore;
- this.dagId = DagManagerUtils.generateDagId(dagAction.getFlowGroup(),
dagAction.getFlowName(), dagAction.getFlowExecutionId());
}
public abstract <T> T host(DagTaskVisitor<T> visitor);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
index f792ace15..caee99e59 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -17,6 +17,12 @@
package org.apache.gobblin.service.modules.restli;
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.helix.HelixManager;
+
import com.google.common.base.Optional;
import com.google.common.eventbus.EventBus;
import com.google.inject.Inject;
@@ -25,17 +31,15 @@ import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.server.RestLiServiceException;
import com.linkedin.restli.server.UpdateResponse;
-import java.io.IOException;
-import java.sql.SQLException;
+
import javax.inject.Named;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
import org.apache.gobblin.service.FlowStatusId;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
-import org.apache.helix.HelixManager;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
@Slf4j
public class GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends
GobblinServiceFlowExecutionResourceHandler{
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
index 8af2ee7a8..d96c67619 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
@@ -17,13 +17,13 @@
package org.apache.gobblin.service.modules.spec;
-import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
index ede2870a1..c51a9f378 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanListDeserializer.java
@@ -120,7 +120,6 @@ public class JobExecutionPlanListDeserializer implements
JsonDeserializer<List<J
String jobExecutionFuture =
serializedJobExecutionPlan.get(SerializationConstants.JOB_EXECUTION_FUTURE).getAsString();
Future future =
specExecutor.getProducer().get().deserializeAddSpecResponse(jobExecutionFuture);
jobExecutionPlan.setJobFuture(Optional.fromNullable(future));
-
} catch (ExecutionException | InterruptedException e) {
log.warn("Error during deserialization of JobExecutionFuture.");
throw new RuntimeException(e);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 68fded2e0..40bbb06bf 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -41,13 +41,13 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
index 07e4229f8..6277360e9 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -26,9 +26,9 @@ import javax.inject.Named;
import javax.inject.Provider;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.util.ConfigUtils;
@@ -60,8 +60,7 @@ public class DagActionStoreChangeMonitorFactory implements
Provider<DagActionSto
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
}
- private DagActionStoreChangeMonitor createDagActionStoreMonitor()
- throws ReflectiveOperationException {
+ private DagActionStoreChangeMonitor createDagActionStoreMonitor() {
Config dagActionStoreChangeConfig =
this.config.getConfig(DagActionStoreChangeMonitor.DAG_ACTION_CHANGE_MONITOR_PREFIX);
log.info("DagActionStore will be initialized with config {}",
dagActionStoreChangeConfig);
@@ -74,12 +73,8 @@ public class DagActionStoreChangeMonitorFactory implements
Provider<DagActionSto
@Override
public DagActionStoreChangeMonitor get() {
- try {
- DagActionStoreChangeMonitor changeMonitor =
createDagActionStoreMonitor();
- changeMonitor.initializeMonitor();
- return changeMonitor;
- } catch (ReflectiveOperationException e) {
- throw new RuntimeException("Failed to initialize DagActionStoreMonitor
due to ", e);
- }
+ DagActionStoreChangeMonitor changeMonitor = createDagActionStoreMonitor();
+ changeMonitor.initializeMonitor();
+ return changeMonitor;
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index 928266602..e7843f1af 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -23,10 +23,8 @@ import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.metrics.ContextAwareMeter;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
-import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagement;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
@@ -38,7 +36,6 @@ import
org.apache.gobblin.service.modules.orchestration.Orchestrator;
@Slf4j
public class DagManagementDagActionStoreChangeMonitor extends
DagActionStoreChangeMonitor {
private final DagManagement dagManagement;
- protected ContextAwareMeter unexpectedLaunchEventErrors;
// Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
// client itself to determine all Kafka related information dynamically
rather than through the config.
@@ -62,28 +59,17 @@ public class DagManagementDagActionStoreChangeMonitor
extends DagActionStoreChan
LaunchSubmissionMetricProxy launchSubmissionMetricProxy = isStartup ?
ON_STARTUP : POST_STARTUP;
try {
// todo - add actions for other other type of dag actions
- if
(dagAction.getDagActionType().equals(DagActionStore.DagActionType.LAUNCH)) {
- // If multi-active scheduler is NOT turned on we should not receive
these type of events
- if (!this.isMultiActiveSchedulerEnabled) {
- this.unexpectedLaunchEventErrors.mark();
- throw new RuntimeException(String.format("Received LAUNCH dagAction
while not in multi-active scheduler "
- + "mode for flowAction: %s", dagAction));
- }
- dagManagement.addDagAction(dagAction);
- } else {
- log.warn("Received unsupported dagAction {}. Expected to be a KILL,
RESUME, or LAUNCH", dagAction.getDagActionType());
- this.unexpectedErrors.mark();
+ switch (dagAction.getDagActionType()) {
+ case LAUNCH :
+ dagManagement.addDagAction(dagAction);
+ break;
+ default:
+ log.warn("Received unsupported dagAction {}. Expected to be a
LAUNCH", dagAction.getDagActionType());
+ this.unexpectedErrors.mark();
}
} catch (IOException e) {
log.warn("Failed to addDagAction for flowId {} due to exception {}",
dagAction.getFlowId(), e.getMessage());
launchSubmissionMetricProxy.markFailure();
}
}
-
- @Override
- protected void createMetrics() {
- super.createMetrics();
- // Dag Action specific metrics
- this.unexpectedLaunchEventErrors =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
- }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
index 6853ed79f..af5146027 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
@@ -26,9 +26,9 @@ import javax.inject.Named;
import javax.inject.Provider;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagement;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
@@ -61,8 +61,7 @@ public class DagManagementDagActionStoreChangeMonitorFactory
implements Provider
this.dagManagement = dagManagement;
}
- private DagManagementDagActionStoreChangeMonitor
createDagActionStoreMonitor()
- throws ReflectiveOperationException {
+ private DagManagementDagActionStoreChangeMonitor
createDagActionStoreMonitor() {
Config dagActionStoreChangeConfig =
this.config.getConfig(DagActionStoreChangeMonitor.DAG_ACTION_CHANGE_MONITOR_PREFIX);
log.info("DagActionStore will be initialized with config {}",
dagActionStoreChangeConfig);
@@ -74,12 +73,8 @@ public class DagManagementDagActionStoreChangeMonitorFactory
implements Provider
@Override
public DagActionStoreChangeMonitor get() {
- try {
- DagActionStoreChangeMonitor changeMonitor =
createDagActionStoreMonitor();
- changeMonitor.initializeMonitor();
- return changeMonitor;
- } catch (ReflectiveOperationException e) {
- throw new RuntimeException("Failed to initialize DagActionStoreMonitor
due to ", e);
- }
+ DagActionStoreChangeMonitor changeMonitor = createDagActionStoreMonitor();
+ changeMonitor.initializeMonitor();
+ return changeMonitor;
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 87e1c3b5a..8ae588231 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -30,9 +30,10 @@ import java.util.concurrent.TimeUnit;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.github.rholder.retry.Attempt;
-import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
+import com.github.rholder.retry.Retryer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
@@ -40,7 +41,6 @@ import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -64,7 +64,10 @@ import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.retry.RetryerFactory;
-import static org.apache.gobblin.util.retry.RetryerFactory.*;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIME_OUT_MS;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE;
+import static org.apache.gobblin.util.retry.RetryerFactory.RetryType;
/**
@@ -241,7 +244,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
String tableName = jobStatusTableName(flowExecutionId, jobGroup,
jobName);
List<org.apache.gobblin.configuration.State> states =
stateStore.getAll(storeName, tableName);
- if (states.size() > 0) {
+ if (!states.isEmpty()) {
org.apache.gobblin.configuration.State previousJobStatus =
states.get(states.size() - 1);
String previousStatus =
previousJobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
String currentStatus =
jobStatus.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
@@ -305,7 +308,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
}
static boolean
isNewStateTransitionToFinal(org.apache.gobblin.configuration.State
currentState, List<org.apache.gobblin.configuration.State> prevStates) {
- if (prevStates.size() == 0) {
+ if (prevStates.isEmpty()) {
return
FlowStatusGenerator.FINISHED_STATUSES.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD));
}
return currentState.contains(JobStatusRetriever.EVENT_NAME_FIELD) &&
FlowStatusGenerator.FINISHED_STATUSES.contains(currentState.getProp(JobStatusRetriever.EVENT_NAME_FIELD))
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index b9a9d33e5..8370183e8 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -154,6 +154,11 @@ public class DagManagerTest {
public static Dag<JobExecutionPlan> buildDag(String id, Long
flowExecutionId, String flowFailureOption, int numNodes, String proxyUser,
Config additionalConfig)
throws URISyntaxException {
+ if (additionalConfig.hasPath(ConfigurationKeys.JOB_NAME_KEY)) {
+ throw new RuntimeException("Please do not set " +
ConfigurationKeys.JOB_NAME_KEY + " because this method is "
+ + "using hard coded job names in setting " +
ConfigurationKeys.JOB_DEPENDENCIES);
+ }
+
List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 138ddb3a2..f5ed96867 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.service.modules.orchestration;
-import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@@ -38,7 +37,6 @@ import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
@@ -84,6 +82,7 @@ public class DagProcessingEngineTest {
new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
mock(MultiActiveLeaseArbiter.class), Optional.empty(), false);
this.dagProcFactory = new DagProcFactory(null);
+
DagProcessingEngine.DagProcEngineThread dagProcEngineThread =
new
DagProcessingEngine.DagProcEngineThread(this.dagManagementTaskStream,
this.dagProcFactory,
dagManagementStateStore);
@@ -127,7 +126,7 @@ public class DagProcessingEngineTest {
@Override
public <T> T host(DagTaskVisitor<T> visitor) {
- return (T) new MockedDagProc(this.isBad);
+ return (T) new MockedDagProc(this, this.isBad);
}
@Override
@@ -136,15 +135,16 @@ public class DagProcessingEngineTest {
}
}
- static class MockedDagProc extends DagProc<Void, Void> {
+ static class MockedDagProc extends DagProc<Void> {
private final boolean isBad;
- public MockedDagProc(boolean isBad) {
- super(null);
+
+ public MockedDagProc(MockedDagTask mockedDagTask, boolean isBad) {
+ super(mockedDagTask);
this.isBad = isBad;
}
@Override
- protected DagManager.DagId getDagId() {
+ public DagManager.DagId getDagId() {
return new DagManager.DagId("fg", "fn", "12345");
}
@@ -154,15 +154,10 @@ public class DagProcessingEngineTest {
}
@Override
- protected Void act(DagManagementStateStore dagManagementStateStore, Void
state) {
+ protected void act(DagManagementStateStore dagManagementStateStore, Void
state) {
if (this.isBad) {
throw new RuntimeException("Simulating an exception!");
}
- return null;
- }
-
- @Override
- protected void sendNotification(Void result, EventSubmitter
eventSubmitter) throws IOException {
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
index 655532ab0..dea34b6cd 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
@@ -18,12 +18,14 @@
package org.apache.gobblin.service.modules.orchestration;
import java.util.Properties;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+
import org.junit.Assert;
import org.quartz.JobDataMap;
import org.testng.annotations.Test;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+
public class FlowLaunchHandlerTest {
long eventToRevisit = 123000L;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index af1532b8a..ceb1b7b9f 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -126,7 +126,8 @@ public class MostlyMySqlDagManagementStateStoreTest {
TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
URI specExecURI = new URI(specExecInstance);
topologySpecMap.put(specExecURI, topologySpec);
- MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null, null, jobStatusRetriever);
+ MostlyMySqlDagManagementStateStore dagManagementStateStore =
+ new MostlyMySqlDagManagementStateStore(config, null, null,
jobStatusRetriever);
dagManagementStateStore.setTopologySpecMap(topologySpecMap);
return dagManagementStateStore;
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 6b71977a1..21b9fc28b 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -35,13 +35,13 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;