This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 01036ccd4 [GOBBLIN-1868] Introduce FlowCompilationValidationHelper &
SharedFlowMetricsSingleton for sharing between Orchestrator & DagManager (#3731)
01036ccd4 is described below
commit 01036ccd45ae4ddf21a6e60acc84da70ccd097c9
Author: umustafi <[email protected]>
AuthorDate: Mon Aug 7 13:19:55 2023 -0700
[GOBBLIN-1868] Introduce FlowCompilationValidationHelper &
SharedFlowMetricsSingleton for sharing between Orchestrator & DagManager (#3731)
* refactoring skeleton, builds
* deploy works
* add javadocs
* fix failing test
* respond to comments
* Clean up in response to next review
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../modules/core/GobblinServiceGuiceModule.java | 3 +
.../service/modules/orchestration/DagManager.java | 61 ++++--
.../modules/orchestration/Orchestrator.java | 243 ++++-----------------
.../scheduler/GobblinServiceJobScheduler.java | 7 +-
.../utils/FlowCompilationValidationHelper.java | 196 +++++++++++++++++
.../modules/utils/SharedFlowMetricsSingleton.java | 104 +++++++++
.../modules/orchestration/DagManagerFlowTest.java | 6 +-
.../modules/orchestration/OrchestratorTest.java | 34 ++-
8 files changed, 409 insertions(+), 245 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 1423e2d8c..af0c3461a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -27,6 +27,7 @@ import
org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
import
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
@@ -186,6 +187,8 @@ public class GobblinServiceGuiceModule implements Module {
binder.bind(RequesterService.class)
.to(NoopRequesterService.class);
+ binder.bind(SharedFlowMetricsSingleton.class);
+
OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class);
if (serviceConfig.isTopologyCatalogEnabled()) {
binder.bind(TopologyCatalog.class);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 20e802bec..207c1c1dc 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -17,6 +17,19 @@
package org.apache.gobblin.service.modules.orchestration;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -39,25 +52,9 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigException;
-import com.typesafe.config.ConfigFactory;
-
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
@@ -77,9 +74,12 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
@@ -206,8 +206,12 @@ public class DagManager extends AbstractIdleService {
protected final Long defaultJobStartSlaTimeMillis;
@Getter
private final JobStatusRetriever jobStatusRetriever;
- private final Orchestrator orchestrator;
+ private final FlowStatusGenerator flowStatusGenerator;
+ private final UserQuotaManager quotaManager;
+ private final SpecCompiler specCompiler;
+ private final boolean isFlowConcurrencyEnabled;
private final FlowCatalog flowCatalog;
+ private final FlowCompilationValidationHelper
flowCompilationValidationHelper;
private final Config config;
private final Optional<EventSubmitter> eventSubmitter;
private final long failedDagRetentionTime;
@@ -218,7 +222,8 @@ public class DagManager extends AbstractIdleService {
private volatile boolean isActive = false;
- public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
Orchestrator orchestrator,
+ public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
+ SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
FlowStatusGenerator flowStatusGenerator,
FlowCatalog flowCatalog, boolean instrumentationEnabled) {
this.config = config;
this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY,
DEFAULT_NUM_THREADS);
@@ -240,8 +245,18 @@ public class DagManager extends AbstractIdleService {
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config,
JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis =
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
this.jobStatusRetriever = jobStatusRetriever;
- this.orchestrator = orchestrator;
+ this.flowStatusGenerator = flowStatusGenerator;
+ this.specCompiler =
GobblinConstructorUtils.invokeConstructor(SpecCompiler.class,
ConfigUtils.getString(config,
+ ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY,
+ ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS), config);
+ this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
+ ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
+ this.quotaManager =
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
+ ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS,
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
+ config);
this.flowCatalog = flowCatalog;
+ this.flowCompilationValidationHelper = new
FlowCompilationValidationHelper(sharedFlowMetricsSingleton, specCompiler,
+ quotaManager, eventSubmitter, flowStatusGenerator,
isFlowConcurrencyEnabled);
TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config,
FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT));
this.failedDagRetentionTime =
timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME,
DEFAULT_FAILED_DAG_RETENTION_TIME));
}
@@ -266,9 +281,10 @@ public class DagManager extends AbstractIdleService {
}
@Inject
- public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
Orchestrator orchestrator,
+ public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
+ SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
FlowStatusGenerator flowStatusGenerator,
FlowCatalog flowCatalog) {
- this(config, jobStatusRetriever, orchestrator, flowCatalog, true);
+ this(config, jobStatusRetriever, sharedFlowMetricsSingleton,
flowStatusGenerator, flowCatalog, true);
}
/** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s
and loading of any {@link Dag}s is done
@@ -483,7 +499,8 @@ public class DagManager extends AbstractIdleService {
try {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
- Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
orchestrator.handleChecksBeforeExecution(spec);
+ Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
+
this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec);
if (optionalJobExecutionPlanDag.isPresent()) {
addDag(optionalJobExecutionPlanDag.get(), true, true);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index b059d1744..186b6d81c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -17,6 +17,11 @@
package org.apache.gobblin.service.modules.orchestration;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
@@ -25,33 +30,18 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Timer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-
import javax.annotation.Nonnull;
import javax.inject.Inject;
import javax.inject.Singleton;
import lombok.Getter;
import lombok.Setter;
-
+import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumentable;
import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -69,10 +59,14 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -92,33 +86,34 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
protected final MetricContext metricContext;
protected final Optional<EventSubmitter> eventSubmitter;
- private final boolean flowConcurrencyFlag;
+ private final boolean isFlowConcurrencyEnabled;
@Getter
private Optional<Meter> flowOrchestrationSuccessFulMeter;
@Getter
private Optional<Meter> flowOrchestrationFailedMeter;
@Getter
private Optional<Timer> flowOrchestrationTimer;
- private Optional<Meter> skippedFlowsMeter;
@Setter
private FlowStatusGenerator flowStatusGenerator;
private UserQuotaManager quotaManager;
+ private final FlowCompilationValidationHelper
flowCompilationValidationHelper;
private Optional<FlowTriggerHandler> flowTriggerHandler;
+ @Getter
+ private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
private final ClassAliasResolver<SpecCompiler> aliasResolver;
- private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
-
public Orchestrator(Config config, Optional<TopologyCatalog>
topologyCatalog, Optional<DagManager> dagManager,
Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, boolean
instrumentationEnabled,
- Optional<FlowTriggerHandler> flowTriggerHandler) {
+ Optional<FlowTriggerHandler> flowTriggerHandler,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton) {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
this.topologyCatalog = topologyCatalog;
this.dagManager = dagManager;
this.flowStatusGenerator = flowStatusGenerator;
this.flowTriggerHandler = flowTriggerHandler;
+ this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
try {
String specCompilerClassName =
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
if
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
@@ -142,27 +137,29 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
this.flowOrchestrationSuccessFulMeter =
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER));
this.flowOrchestrationFailedMeter =
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER));
this.flowOrchestrationTimer =
Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER));
- this.skippedFlowsMeter =
Optional.of(metricContext.contextAwareMeter(ServiceMetricNames.SKIPPED_FLOWS));
this.eventSubmitter = Optional.of(new
EventSubmitter.Builder(this.metricContext,
"org.apache.gobblin.service").build());
} else {
this.metricContext = null;
this.flowOrchestrationSuccessFulMeter = Optional.absent();
this.flowOrchestrationFailedMeter = Optional.absent();
this.flowOrchestrationTimer = Optional.absent();
- this.skippedFlowsMeter = Optional.absent();
this.eventSubmitter = Optional.absent();
}
- this.flowConcurrencyFlag = ConfigUtils.getBoolean(config,
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
+ this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
quotaManager =
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS,
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
config);
+ this.flowCompilationValidationHelper = new
FlowCompilationValidationHelper(sharedFlowMetricsSingleton, specCompiler,
+ quotaManager, eventSubmitter, flowStatusGenerator,
isFlowConcurrencyEnabled);
}
@Inject
public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator,
Optional<TopologyCatalog> topologyCatalog,
- Optional<DagManager> dagManager, Optional<Logger> log,
Optional<FlowTriggerHandler> flowTriggerHandler) {
- this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true,
flowTriggerHandler);
+ Optional<DagManager> dagManager, Optional<Logger> log,
Optional<FlowTriggerHandler> flowTriggerHandler,
+ SharedFlowMetricsSingleton sharedFlowMetricsSingleton) {
+ this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true,
flowTriggerHandler,
+ sharedFlowMetricsSingleton);
}
@@ -236,16 +233,14 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
- // Only register the metric of flows that are scheduled, run once flows
should not be tracked indefinitely
- if (!flowGauges.containsKey(spec.getUri().toString()) &&
flowConfig.hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
- String flowCompiledGaugeName =
- MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
flowGroup, flowName, ServiceMetricNames.COMPILED);
- flowGauges.put(spec.getUri().toString(), new FlowCompiledState());
- ContextAwareGauge<Integer> gauge =
RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName, () ->
flowGauges.get(spec.getUri().toString()).state.value);
- RootMetricContext.get().register(flowCompiledGaugeName, gauge);
- }
-
- if (!isExecutionPermittedHandler(flowConfig, spec, flowName, flowGroup))
{
+ sharedFlowMetricsSingleton.addFlowGauge(spec, flowConfig, flowGroup,
flowName);
+ Optional<TimingEvent> flowCompilationTimer =
+ this.eventSubmitter.transform(submitter -> new
TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
+ Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
+
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
spec, flowGroup,
+ flowName);
+ if (!jobExecutionPlanDagOptional.isPresent()) {
+ Instrumented.markMeter(this.flowOrchestrationFailedMeter);
return;
}
Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
@@ -272,21 +267,19 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
_log.info("Multi-active scheduler finished handling trigger event:
[{}, triggerEventTimestamp: {}]", flowAction,
triggerTimestampMillis);
} else {
- // Compile flow spec and do corresponding checks
- Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(spec);
- Optional<TimingEvent> flowCompilationTimer =
- this.eventSubmitter.transform(submitter -> new
TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
-
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
jobExecutionPlanDagOptional.get();
if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
- populateFlowCompilationFailedEventMessage(spec, flowMetadata);
+
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
spec, flowMetadata);
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
- conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.FAILED);
+
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
+ SharedFlowMetricsSingleton.CompiledState.FAILED);
_log.warn("Cannot determine an executor to run on for Spec: " +
spec);
return;
}
- conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SUCCESSFUL);
+ sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
+ SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
- addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
jobExecutionPlanDag);
if (flowCompilationTimer.isPresent()) {
flowCompilationTimer.get().stop(flowMetadata);
}
@@ -336,119 +329,13 @@ public class Orchestrator implements
SpecCatalogListener, Instrumentable {
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
}
- /**
- * If it is a scheduled flow (and hence, does not have flowExecutionId in
the FlowSpec) and the flow compilation is
- * successful, retrieve the flowExecutionId from the JobSpec.
- */
- public void addFlowExecutionIdIfAbsent(Map<String,String> flowMetadata,
Dag<JobExecutionPlan> jobExecutionPlanDag) {
-
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
-
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(
- ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
- }
-
- /**
- * Checks if flowSpec disallows concurrent executions, and if so then checks
if another instance of the flow is
- * already running and emits a FLOW FAILED event. Otherwise, this check
passes.
- * @return true if caller can proceed to execute flow, false otherwise
- * @throws IOException
- */
- public boolean isExecutionPermittedHandler(Config flowConfig, Spec spec,
String flowName, String flowGroup)
- throws IOException {
- boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, this.flowConcurrencyFlag);
-
- Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
-
- if (!isExecutionPermitted(flowName, flowGroup, allowConcurrentExecution)) {
- _log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
- + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
- conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SKIPPED);
- Instrumented.markMeter(this.skippedFlowsMeter);
- if (!((FlowSpec)
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
- // For ad-hoc flow, we might already increase quota, we need to
decrease here
- for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
- quotaManager.releaseQuota(dagNode);
- }
- }
-
- // Send FLOW_FAILED event
- Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
- flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because
another instance is running and concurrent "
- + "executions are disabled. Set flow.allowConcurrentExecution to
true in the flow spec to change this behaviour.");
- if (this.eventSubmitter.isPresent()) {
- new TimingEvent(this.eventSubmitter.get(),
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
- }
- return false;
- }
- return true;
- }
-
-
- /**
- * Abstraction used to populate the message of and emit a FlowCompileFailed
event for the Orchestrator.
- * @param spec
- * @param flowMetadata
- */
- public void populateFlowCompilationFailedEventMessage(Spec spec,
- Map<String, String> flowMetadata) {
- // For scheduled flows, we do not insert the flowExecutionId into the
FlowSpec. As a result, if the flow
- // compilation fails (i.e. we are unable to find a path), the metadata
will not have flowExecutionId.
- // In this case, the current time is used as the flow executionId.
-
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
- Long.toString(System.currentTimeMillis()));
-
- String message = "Flow was not compiled successfully.";
- if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
- message = message + " Compilation errors encountered: " + ((FlowSpec)
spec).getCompilationErrors();
- }
- flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
-
- Optional<TimingEvent> flowCompileFailedTimer =
eventSubmitter.transform(submitter ->
- new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
-
- if (flowCompileFailedTimer.isPresent()) {
- flowCompileFailedTimer.get().stop(flowMetadata);
- }
- }
-
- /**
- * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
- * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
- * caller.
- * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
- */
- public Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(FlowSpec
flowSpec)
- throws IOException, InterruptedException {
- Config flowConfig = flowSpec.getConfig();
- String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
- String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
- if (!isExecutionPermittedHandler(flowConfig, flowSpec, flowName,
flowGroup)) {
- return Optional.absent();
- }
-
- //Wait for the SpecCompiler to become healthy.
- this.getSpecCompiler().awaitHealthy();
-
- Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
- Optional<TimingEvent> flowCompilationTimer =
- this.eventSubmitter.transform(submitter -> new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILED));
- Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
-
- if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
- populateFlowCompilationFailedEventMessage(flowSpec, flowMetadata);
- return Optional.absent();
- }
-
- addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
- if (flowCompilationTimer.isPresent()) {
- flowCompilationTimer.get().stop(flowMetadata);
- }
- return Optional.of(jobExecutionPlanDag);
- }
-
public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException,
InterruptedException {
- Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
handleChecksBeforeExecution(flowSpec);
+ Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
+
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec);
if (optionalJobExecutionPlanDag.isPresent()) {
submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
+ } else {
+ Instrumented.markMeter(this.flowOrchestrationFailedMeter);
}
}
@@ -469,22 +356,6 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
}
}
- /**
- * Check if a FlowSpec instance is allowed to run.
- *
- * @param flowName
- * @param flowGroup
- * @param allowConcurrentExecution
- * @return true if the {@link FlowSpec} allows concurrent executions or if
no other instance of the flow is currently RUNNING.
- */
- private boolean isExecutionPermitted(String flowName, String flowGroup,
boolean allowConcurrentExecution) {
- if (allowConcurrentExecution) {
- return true;
- } else {
- return !flowStatusGenerator.isFlowRunning(flowName, flowGroup);
- }
- }
-
public void remove(Spec spec, Properties headers) throws IOException {
// TODO: Evolve logic to cache and reuse previously compiled JobSpecs
// .. this will work for Identity compiler but not always for multi-hop.
@@ -529,18 +400,6 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
}
}
- /**
- * Updates the flowgauge related to the spec if the gauge is being tracked
for the flow
- * @param spec FlowSpec to be updated
- * @param state desired state to set the gauge
- */
- private void conditionallyUpdateFlowGaugeSpecState(Spec spec, CompiledState
state) {
- if (this.flowGauges.containsKey(spec.getUri().toString())) {
- this.flowGauges.get(spec.getUri().toString()).setState(state);
- }
- }
-
-
@Nonnull
@Override
public MetricContext getMetricContext() {
@@ -566,22 +425,4 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
public void switchMetricContext(MetricContext context) {
throw new UnsupportedOperationException();
}
-
- @Setter
- private static class FlowCompiledState {
- private CompiledState state = CompiledState.UNKNOWN;
- }
-
- private enum CompiledState {
- FAILED(-1),
- UNKNOWN(0),
- SUCCESSFUL(1),
- SKIPPED(2);
-
- public int value;
-
- CompiledState(int value) {
- this.value = value;
- }
- }
}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index d860ed485..f71093f52 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -31,6 +31,7 @@ import java.util.TimeZone;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.helix.HelixManager;
import org.quartz.CronExpression;
import org.quartz.DisallowConcurrentExecution;
@@ -208,10 +209,12 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
public GobblinServiceJobScheduler(String serviceName, Config config,
FlowStatusGenerator flowStatusGenerator,
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
Optional<TopologyCatalog> topologyCatalog,
Optional<DagManager> dagManager, Optional<UserQuotaManager>
quotaManager, SchedulerService schedulerService,
- Optional<Logger> log, boolean warmStandbyEnabled, Optional
<FlowTriggerHandler> flowTriggerHandler)
+ Optional<Logger> log, boolean warmStandbyEnabled, Optional
<FlowTriggerHandler> flowTriggerHandler,
+ SharedFlowMetricsSingleton sharedFlowMetricsSingleton)
throws Exception {
this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
- new Orchestrator(config, flowStatusGenerator, topologyCatalog,
dagManager, log, flowTriggerHandler),
+ new Orchestrator(config, flowStatusGenerator, topologyCatalog,
dagManager, log, flowTriggerHandler,
+ sharedFlowMetricsSingleton),
schedulerService, quotaManager, log, warmStandbyEnabled,
flowTriggerHandler);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
new file mode 100644
index 000000000..747ebc2bc
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Helper class with functionality meant to be re-used between the DagManager
and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateful,
+ * requiring all stateful pieces to be passed as input from the caller upon
instantiating the helper.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development, so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+@Slf4j
+@Data
+public final class FlowCompilationValidationHelper {
+ private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
+ private final SpecCompiler specCompiler;
+ private final UserQuotaManager quotaManager;
+ private final Optional<EventSubmitter> eventSubmitter;
+ private final FlowStatusGenerator flowStatusGenerator;
+ private final boolean isFlowConcurrencyEnabled;
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec
flowSpec)
+ throws IOException, InterruptedException {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+
+ //Wait for the SpecCompiler to become healthy.
+ specCompiler.awaitHealthy();
+
+ Optional<TimingEvent> flowCompilationTimer =
+ this.eventSubmitter.transform(submitter -> new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILED));
+ Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
+ validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup,
flowName);
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+
+ if (!jobExecutionPlanDagOptional.isPresent()) {
+ return Optional.absent();
+ }
+
+ if (jobExecutionPlanDagOptional.get() == null ||
jobExecutionPlanDagOptional.get().isEmpty()) {
+ populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec,
flowMetadata);
+ return Optional.absent();
+ }
+
+ addFlowExecutionIdIfAbsent(flowMetadata,
jobExecutionPlanDagOptional.get());
+ if (flowCompilationTimer.isPresent()) {
+ flowCompilationTimer.get().stop(flowMetadata);
+ }
+ return jobExecutionPlanDagOptional;
+ }
+
+ /**
+ * Checks if flowSpec disallows concurrent executions, and if so then checks
if another instance of the flow is
+ * already running and emits a FLOW FAILED event. Otherwise, this check
passes.
+ * @return Optional<Dag<JobExecutionPlan>> if caller allowed to execute flow
and compile spec, else absent Optional
+ * @throws IOException
+ */
+ public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Config flowConfig, Spec spec,
+ String flowGroup, String flowName) throws IOException {
+ boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+ ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
isFlowConcurrencyEnabled);
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+ if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup,
allowConcurrentExecution)) {
+ return Optional.of(jobExecutionPlanDag);
+ } else {
+ log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
+ sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
+ SharedFlowMetricsSingleton.CompiledState.SKIPPED);
+
Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter());
+ if (!isScheduledFlow((FlowSpec) spec)) {
+ // For ad-hoc flow, we might already increase quota, we need to
decrease here
+ for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+ quotaManager.releaseQuota(dagNode);
+ }
+ }
+
+ // Send FLOW_FAILED event
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because
another instance is running and concurrent "
+ + "executions are disabled. Set flow.allowConcurrentExecution to
true in the flow spec to change this behaviour.");
+ if (eventSubmitter.isPresent()) {
+ new TimingEvent(eventSubmitter.get(),
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+ }
+ return Optional.absent();
+ }
+ }
+
+ /**
+ * Check if a FlowSpec instance is allowed to run.
+ *
+ * @param flowName
+ * @param flowGroup
+ * @param allowConcurrentExecution
+ * @return true if the {@link FlowSpec} allows concurrent executions or if
no other instance of the flow is currently RUNNING.
+ */
+ private boolean isExecutionPermitted(FlowStatusGenerator
flowStatusGenerator, String flowName, String flowGroup,
+ boolean allowConcurrentExecution) {
+ return allowConcurrentExecution ||
!flowStatusGenerator.isFlowRunning(flowName, flowGroup);
+ }
+
+ /**
+ * Abstraction used to populate the message of and emit a FlowCompileFailed
event for the Orchestrator.
+ * @param spec
+ * @param flowMetadata
+ */
+ public static void
populateFlowCompilationFailedEventMessage(Optional<EventSubmitter>
eventSubmitter, Spec spec,
+ Map<String, String> flowMetadata) {
+ // For scheduled flows, we do not insert the flowExecutionId into the
FlowSpec. As a result, if the flow
+ // compilation fails (i.e. we are unable to find a path), the metadata
will not have flowExecutionId.
+ // In this case, the current time is used as the flow executionId.
+
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ Long.toString(System.currentTimeMillis()));
+
+ String message = "Flow was not compiled successfully.";
+ if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+ message = message + " Compilation errors encountered: " + ((FlowSpec)
spec).getCompilationErrors();
+ }
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+ Optional<TimingEvent> flowCompileFailedTimer =
eventSubmitter.transform(submitter ->
+ new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+ if (flowCompileFailedTimer.isPresent()) {
+ flowCompileFailedTimer.get().stop(flowMetadata);
+ }
+ }
+
+ /**
+ * If it is a scheduled flow (and hence, does not have flowExecutionId in
the FlowSpec) and the flow compilation is
+ * successful, retrieve the flowExecutionId from the JobSpec.
+ */
+ public static void addFlowExecutionIdIfAbsent(Map<String,String>
flowMetadata,
+ Dag<JobExecutionPlan> jobExecutionPlanDag) {
+
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(
+ ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+ }
+
+ /**
+ * Return true if the spec contains a schedule, false otherwise.
+ */
+ public static boolean isScheduledFlow(FlowSpec spec) {
+ return
spec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY);
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsSingleton.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsSingleton.java
new file mode 100644
index 000000000..ab9bb5f97
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsSingleton.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.net.URI;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator}
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsSingleton {
+ protected final MetricContext metricContext;
+ private Map<URI, FlowCompiledState> flowGaugeStateBySpecUri =
Maps.newHashMap();
+ private Optional<Meter> skippedFlowsMeter;
+
+ @Setter
+ public static class FlowCompiledState {
+ private CompiledState state = CompiledState.UNKNOWN;
+ }
+
+ public enum CompiledState {
+ FAILED(-1),
+ UNKNOWN(0),
+ SUCCESSFUL(1),
+ SKIPPED(2);
+
+ public int value;
+
+ CompiledState(int value) {
+ this.value = value;
+ }
+ }
+
+ @Inject
+ public SharedFlowMetricsSingleton(Config config) {
+ this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(config),
+ SharedFlowMetricsSingleton.class);
+ this.skippedFlowsMeter =
Optional.of(metricContext.contextAwareMeter(ServiceMetricNames.SKIPPED_FLOWS));
+ }
+
+ /**
+ * Adds a new FlowGauge to the metric context if one does not already exist
for this flow spec
+ */
+ public void addFlowGauge(Spec spec, Config flowConfig, String flowGroup,
String flowName) {
+ // Only register the metric of flows that are scheduled, run once flows
should not be tracked indefinitely
+ if (!flowGaugeStateBySpecUri.containsKey(spec.getUri())
+ && flowConfig.hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ String flowCompiledGaugeName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowGroup,
flowName,
+ ServiceMetricNames.COMPILED);
+ flowGaugeStateBySpecUri.put(spec.getUri(), new FlowCompiledState());
+ ContextAwareGauge<Integer> gauge =
+ RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName,
+ () -> flowGaugeStateBySpecUri.get(spec.getUri()).state.value);
+ RootMetricContext.get().register(flowCompiledGaugeName, gauge);
+ }
+ }
+ /**
+ * Updates the flowgauge related to the spec if the gauge is being tracked
for the flow
+ * @param spec FlowSpec to be updated
+ * @param state desired state to set the gauge
+ */
+ public void conditionallyUpdateFlowGaugeSpecState(Spec spec, CompiledState
state) {
+ if (flowGaugeStateBySpecUri.containsKey(spec.getUri())) {
+ flowGaugeStateBySpecUri.get(spec.getUri()).setState(state);
+ }
+ }
+}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 603be2a17..5445a2096 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -342,8 +344,8 @@ class CancelPredicate implements Predicate<Void> {
class MockedDagManager extends DagManager {
public MockedDagManager(Config config, boolean instrumentationEnabled) {
- super(config, createJobStatusRetriever(),
Mockito.mock(Orchestrator.class), Mockito.mock(FlowCatalog.class),
- instrumentationEnabled);
+ super(config, createJobStatusRetriever(),
Mockito.mock(SharedFlowMetricsSingleton.class),
+ Mockito.mock(FlowStatusGenerator.class),
Mockito.mock(FlowCatalog.class), instrumentationEnabled);
}
private static JobStatusRetriever createJobStatusRetriever() {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index fafcc9605..2d3f4b3f9 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -18,34 +18,23 @@
package org.apache.gobblin.service.modules.orchestration;
import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.typesafe.config.Config;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
-
import org.apache.commons.io.FileUtils;
-
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.runtime.api.SpecCatalogListener;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Optional;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.typesafe.config.Config;
-
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
@@ -53,9 +42,17 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
import static org.mockito.Mockito.*;
@@ -111,7 +108,8 @@ public class OrchestratorTest {
this._mockFlowTriggerHandler = mock(FlowTriggerHandler.class);
this.orchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
this.mockStatusGenerator, Optional.of(this.topologyCatalog),
Optional.<DagManager>absent(), Optional.of(logger),
- Optional.of(this._mockFlowTriggerHandler));
+ Optional.of(this._mockFlowTriggerHandler), new
SharedFlowMetricsSingleton(
+ ConfigUtils.propertiesToConfig(orchestratorProperties)));
this.topologyCatalog.addListener(orchestrator);
this.flowCatalog.addListener(orchestrator);
// Start application
@@ -332,7 +330,7 @@ public class OrchestratorTest {
@Test (dependsOnMethods = "deleteFlowSpec")
public void doNotRegisterMetricsAdhocFlows() throws Exception {
- MetricContext metricContext = this.orchestrator.getMetricContext();
+ MetricContext metricContext =
this.orchestrator.getSharedFlowMetricsSingleton().getMetricContext();
this.topologyCatalog.getInitComplete().countDown(); // unblock
orchestration
Properties flowProps = new Properties();
flowProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, "flow0");