phet commented on code in PR #3731:
URL: https://github.com/apache/gobblin/pull/3731#discussion_r1284711212
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -240,7 +252,26 @@ public DagManager(Config config, JobStatusRetriever
jobStatusRetriever, Orchestr
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config,
JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis =
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
this.jobStatusRetriever = jobStatusRetriever;
- this.orchestrator = orchestrator;
+ this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
+ this.flowStatusGenerator = flowStatusGenerator;
+ try {
+ String specCompilerClassName =
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
+ if
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
+ specCompilerClassName =
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
Review Comment:
I prefer the use of `ConfigUtils.getString` below, which takes a default, so
all this can happen in a single line, w/o needing an `if` block.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -240,7 +252,26 @@ public DagManager(Config config, JobStatusRetriever
jobStatusRetriever, Orchestr
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config,
JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis =
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
this.jobStatusRetriever = jobStatusRetriever;
- this.orchestrator = orchestrator;
+ this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
+ this.flowStatusGenerator = flowStatusGenerator;
+ try {
+ String specCompilerClassName =
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
+ if
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
+ specCompilerClassName =
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
+ }
+ log.info("Using specCompiler class name/alias " + specCompilerClassName);
+
+ this.specCompiler = (SpecCompiler)
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)),
config);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException |
+ ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ this.flowConcurrencyFlag = ConfigUtils.getBoolean(config,
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
+ ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
Review Comment:
I'm unclear what's meant by "flow concurrency". AFAIR "allow concurrent
executions" is flow-scoped, rather than system-level, ain't it?
also, the name `flowConcurrencyFlag` is less preferable to precisely stating
the meaning/sense of the flag, like `isFlowConcurrencyEnabled` or
`flowConcurrencyDisabled`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator}
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsContainer {
+ protected final MetricContext metricContext;
+ private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
+ private Optional<Meter> skippedFlowsMeter;
+
+ @Inject
+ public SharedFlowMetricsContainer(Config config) {
+ this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(config),
SharedFlowMetricsContainer.class);
+ this.skippedFlowsMeter =
Optional.of(metricContext.contextAwareMeter(ServiceMetricNames.SKIPPED_FLOWS));
+ }
+
+ /**
+ * Adds a new FlowGauge to the metric context if one does not already exist
for this flow spec
+ */
+ public void addFlowGauge(Spec spec, Config flowConfig, String flowName,
String flowGroup) {
Review Comment:
I find widely-employed, non-semantic types like `String` error-prone,
because particular instances are readily confused w/ one another, such as w/
param/arg mis-ordering. since `flowGroup` is less-specific than `flowName`, my
habit is to put it first, which appears to be how `MetricRegistry.name` wants
them as well... and yet they're swapped in this method's params.
let's seek uniform consistency, so a misordering easily jumps out at
maintainers. nonetheless, even better, would be to replace the two `String`
params w/ a single `FlowId`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+ SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler
specCompiler, UserQuotaManager quotaManager,
+ Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator
flowStatusGenerator, Logger log,
+ boolean flowConcurrencyFlag, FlowSpec flowSpec)
+ throws IOException, InterruptedException {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler,
quotaManager, eventSubmitter,
+ flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec,
flowName, flowGroup)) {
+ return Optional.absent();
+ }
+
+ //Wait for the SpecCompiler to become healthy.
+ specCompiler.awaitHealthy();
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+ Optional<TimingEvent> flowCompilationTimer =
+ eventSubmitter.transform(submitter -> new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILED));
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec,
flowMetadata);
+ return Optional.absent();
+ }
+
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+ if (flowCompilationTimer.isPresent()) {
+ flowCompilationTimer.get().stop(flowMetadata);
+ }
+ return Optional.of(jobExecutionPlanDag);
+ }
+
+ /**
+ * Checks if flowSpec disallows concurrent executions, and if so then checks
if another instance of the flow is
+ * already running and emits a FLOW FAILED event. Otherwise, this check
passes.
+ * @return true if caller can proceed to execute flow, false otherwise
+ * @throws IOException
+ */
+ public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer
sharedFlowMetricsContainer,
+ SpecCompiler specCompiler, UserQuotaManager quotaManager,
Optional<EventSubmitter> eventSubmitter,
+ FlowStatusGenerator flowStatusGenerator, Logger log, boolean
flowConcurrencyFlag, Config flowConfig, Spec spec,
+ String flowName, String flowGroup) throws IOException {
Review Comment:
I commented elsewhere about lack of uniform ordering between `flowName` and
`flowGroup` params... as well as way to sidestep via `FlowId` type
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+ SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler
specCompiler, UserQuotaManager quotaManager,
+ Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator
flowStatusGenerator, Logger log,
+ boolean flowConcurrencyFlag, FlowSpec flowSpec)
+ throws IOException, InterruptedException {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler,
quotaManager, eventSubmitter,
+ flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec,
flowName, flowGroup)) {
+ return Optional.absent();
+ }
+
+ //Wait for the SpecCompiler to become healthy.
+ specCompiler.awaitHealthy();
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+ Optional<TimingEvent> flowCompilationTimer =
+ eventSubmitter.transform(submitter -> new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILED));
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec,
flowMetadata);
+ return Optional.absent();
+ }
+
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+ if (flowCompilationTimer.isPresent()) {
+ flowCompilationTimer.get().stop(flowMetadata);
+ }
+ return Optional.of(jobExecutionPlanDag);
+ }
+
+ /**
+ * Checks if flowSpec disallows concurrent executions, and if so then checks
if another instance of the flow is
+ * already running and emits a FLOW FAILED event. Otherwise, this check
passes.
+ * @return true if caller can proceed to execute flow, false otherwise
+ * @throws IOException
+ */
+ public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer
sharedFlowMetricsContainer,
+ SpecCompiler specCompiler, UserQuotaManager quotaManager,
Optional<EventSubmitter> eventSubmitter,
+ FlowStatusGenerator flowStatusGenerator, Logger log, boolean
flowConcurrencyFlag, Config flowConfig, Spec spec,
+ String flowName, String flowGroup) throws IOException {
+ boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+ ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
flowConcurrencyFlag);
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+ if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup,
allowConcurrentExecution)) {
+ log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
+ sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec,
SharedFlowMetricsContainer.CompiledState.SKIPPED);
+
Instrumented.markMeter(sharedFlowMetricsContainer.getSkippedFlowsMeter());
+ if (!((FlowSpec)
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ // For ad-hoc flow, we might already increase quota, we need to
decrease here
+ for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+ quotaManager.releaseQuota(dagNode);
+ }
+ }
+
+ // Send FLOW_FAILED event
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because
another instance is running and concurrent "
+ + "executions are disabled. Set flow.allowConcurrentExecution to
true in the flow spec to change this behaviour.");
+ if (eventSubmitter.isPresent()) {
+ new TimingEvent(eventSubmitter.get(),
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+ }
+ return false;
+ }
+ return true;
Review Comment:
not sure where we are in the nesting, so that I've got this right... *but
that's the problem I'm suggesting to solve here!* ;)
how about:
```
if (isExecPermitted(...)) {
return true;
} else {
... // all the many other lines of code
}
```
?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
Review Comment:
perhaps more specific... `compileToExecutionPlan`?
`createExecutionPlanIfValid`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -206,7 +212,12 @@ public String toString() {
protected final Long defaultJobStartSlaTimeMillis;
@Getter
private final JobStatusRetriever jobStatusRetriever;
- private final Orchestrator orchestrator;
+ private final FlowStatusGenerator flowStatusGenerator;
+ private final UserQuotaManager quotaManager;
+ private final ClassAliasResolver<SpecCompiler> aliasResolver;
Review Comment:
is this used outside the ctor? if not, why an instance member?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator}
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsContainer {
Review Comment:
naming-wise: "container" has very specific meaning: an exec env that hosts
user-supplied code. e.g. "my app runs in a container," or "container
deployment failed." more rarely, it's a synonym for "collection", especially
the abstract concept. e.g. "Map, List, and PriorityQueue are Java container
types."
this class, looks like simply `SharedFlowMetrics` or the
`SharedFlowMetricsSingleton` or a `SharedFlowMetricsTracker`...
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -240,7 +252,26 @@ public DagManager(Config config, JobStatusRetriever
jobStatusRetriever, Orchestr
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config,
JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis =
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
this.jobStatusRetriever = jobStatusRetriever;
- this.orchestrator = orchestrator;
+ this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
+ this.flowStatusGenerator = flowStatusGenerator;
+ try {
+ String specCompilerClassName =
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
+ if
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
+ specCompilerClassName =
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
+ }
+ log.info("Using specCompiler class name/alias " + specCompilerClassName);
+
+ this.specCompiler = (SpecCompiler)
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)),
config);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException |
+ ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
Review Comment:
this ctor is 46 lines long now, with 13 involving initializing this
`SpecCompiler`. let's streamline and minimize the distraction by abstracting
it into a one-line method call, like:
```
this.specCompiler = createSpecCompiler(config);
```
alternatively, is there a way to use the `GobblinConstructorUtils`, like you
did below for the `UserQuotaManager`? e.g. -
https://github.com/apache/gobblin/blob/2917b6311949d25f898a6691fe0730d344299d25/gobblin-utility/src/main/java/org/apache/gobblin/util/reflection/GobblinConstructorUtils.java#L110
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator}
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsContainer {
+ protected final MetricContext metricContext;
+ private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
Review Comment:
everywhere below, I see `spec.getUri().toString()`... can't we simply use a
URI as this map's key, as doing so additionally carries richer semantics
also: up to you, but my naming convention here would be
`flowGaugeStateBySpecUri`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+ SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler
specCompiler, UserQuotaManager quotaManager,
+ Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator
flowStatusGenerator, Logger log,
+ boolean flowConcurrencyFlag, FlowSpec flowSpec)
+ throws IOException, InterruptedException {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler,
quotaManager, eventSubmitter,
+ flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec,
flowName, flowGroup)) {
+ return Optional.absent();
+ }
+
+ //Wait for the SpecCompiler to become healthy.
+ specCompiler.awaitHealthy();
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+ Optional<TimingEvent> flowCompilationTimer =
+ eventSubmitter.transform(submitter -> new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILED));
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec,
flowMetadata);
+ return Optional.absent();
+ }
+
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+ if (flowCompilationTimer.isPresent()) {
+ flowCompilationTimer.get().stop(flowMetadata);
+ }
+ return Optional.of(jobExecutionPlanDag);
+ }
+
+ /**
+ * Checks if flowSpec disallows concurrent executions, and if so then checks
if another instance of the flow is
+ * already running and emits a FLOW FAILED event. Otherwise, this check
passes.
+ * @return true if caller can proceed to execute flow, false otherwise
+ * @throws IOException
+ */
+ public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer
sharedFlowMetricsContainer,
+ SpecCompiler specCompiler, UserQuotaManager quotaManager,
Optional<EventSubmitter> eventSubmitter,
+ FlowStatusGenerator flowStatusGenerator, Logger log, boolean
flowConcurrencyFlag, Config flowConfig, Spec spec,
+ String flowName, String flowGroup) throws IOException {
+ boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+ ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
flowConcurrencyFlag);
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+ if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup,
allowConcurrentExecution)) {
+ log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
+ sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec,
SharedFlowMetricsContainer.CompiledState.SKIPPED);
+
Instrumented.markMeter(sharedFlowMetricsContainer.getSkippedFlowsMeter());
+ if (!((FlowSpec)
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
Review Comment:
NBD... but I do imagine it being clearer to read:
```
if (isScheduledFlow(spec)) ...
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+ SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler
specCompiler, UserQuotaManager quotaManager,
+ Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator
flowStatusGenerator, Logger log,
+ boolean flowConcurrencyFlag, FlowSpec flowSpec)
+ throws IOException, InterruptedException {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler,
quotaManager, eventSubmitter,
+ flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec,
flowName, flowGroup)) {
+ return Optional.absent();
+ }
+
+ //Wait for the SpecCompiler to become healthy.
+ specCompiler.awaitHealthy();
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+ Optional<TimingEvent> flowCompilationTimer =
+ eventSubmitter.transform(submitter -> new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILED));
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec,
flowMetadata);
+ return Optional.absent();
+ }
+
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+ if (flowCompilationTimer.isPresent()) {
+ flowCompilationTimer.get().stop(flowMetadata);
+ }
+ return Optional.of(jobExecutionPlanDag);
+ }
+
+ /**
+ * Checks if flowSpec disallows concurrent executions, and if so then checks
if another instance of the flow is
+ * already running and emits a FLOW FAILED event. Otherwise, this check
passes.
+ * @return true if caller can proceed to execute flow, false otherwise
+ * @throws IOException
+ */
+ public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer
sharedFlowMetricsContainer,
+ SpecCompiler specCompiler, UserQuotaManager quotaManager,
Optional<EventSubmitter> eventSubmitter,
+ FlowStatusGenerator flowStatusGenerator, Logger log, boolean
flowConcurrencyFlag, Config flowConfig, Spec spec,
+ String flowName, String flowGroup) throws IOException {
+ boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+ ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
flowConcurrencyFlag);
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+ if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup,
allowConcurrentExecution)) {
+ log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
Review Comment:
(I notice we log them as group, then name--so that may be what maintainers
become most familiar w/)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+ SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler
specCompiler, UserQuotaManager quotaManager,
+ Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator
flowStatusGenerator, Logger log,
+ boolean flowConcurrencyFlag, FlowSpec flowSpec)
+ throws IOException, InterruptedException {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler,
quotaManager, eventSubmitter,
+ flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec,
flowName, flowGroup)) {
+ return Optional.absent();
+ }
+
+ //Wait for the SpecCompiler to become healthy.
+ specCompiler.awaitHealthy();
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+ Optional<TimingEvent> flowCompilationTimer =
+ eventSubmitter.transform(submitter -> new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILED));
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec,
flowMetadata);
+ return Optional.absent();
+ }
+
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+ if (flowCompilationTimer.isPresent()) {
+ flowCompilationTimer.get().stop(flowMetadata);
+ }
+ return Optional.of(jobExecutionPlanDag);
+ }
+
+ /**
+ * Checks if flowSpec disallows concurrent executions, and if so then checks
if another instance of the flow is
+ * already running and emits a FLOW FAILED event. Otherwise, this check
passes.
+ * @return true if caller can proceed to execute flow, false otherwise
+ * @throws IOException
+ */
+ public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer
sharedFlowMetricsContainer,
+ SpecCompiler specCompiler, UserQuotaManager quotaManager,
Optional<EventSubmitter> eventSubmitter,
+ FlowStatusGenerator flowStatusGenerator, Logger log, boolean
flowConcurrencyFlag, Config flowConfig, Spec spec,
+ String flowName, String flowGroup) throws IOException {
+ boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+ ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
flowConcurrencyFlag);
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+ if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup,
allowConcurrentExecution)) {
+ log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
+ sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec,
SharedFlowMetricsContainer.CompiledState.SKIPPED);
+
Instrumented.markMeter(sharedFlowMetricsContainer.getSkippedFlowsMeter());
+ if (!((FlowSpec)
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ // For ad-hoc flow, we might already increase quota, we need to
decrease here
+ for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+ quotaManager.releaseQuota(dagNode);
+ }
+ }
+
+ // Send FLOW_FAILED event
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because
another instance is running and concurrent "
+ + "executions are disabled. Set flow.allowConcurrentExecution to
true in the flow spec to change this behaviour.");
+ if (eventSubmitter.isPresent()) {
+ new TimingEvent(eventSubmitter.get(),
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Check if a FlowSpec instance is allowed to run.
+ *
+ * @param flowName
+ * @param flowGroup
+ * @param allowConcurrentExecution
+ * @return true if the {@link FlowSpec} allows concurrent executions or if
no other instance of the flow is currently RUNNING.
+ */
+ private static boolean isExecutionPermitted(FlowStatusGenerator
flowStatusGenerator, String flowName,
+ String flowGroup, boolean allowConcurrentExecution) {
+ if (allowConcurrentExecution) {
+ return true;
+ } else {
+ return !flowStatusGenerator.isFlowRunning(flowName, flowGroup);
+ }
Review Comment:
nit:
```
return allowConcurrent || !fsg.isFlowRunning(...)
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator}
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsContainer {
+ protected final MetricContext metricContext;
+ private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
+ private Optional<Meter> skippedFlowsMeter;
+
+ @Inject
+ public SharedFlowMetricsContainer(Config config) {
+ this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(config),
SharedFlowMetricsContainer.class);
+ this.skippedFlowsMeter =
Optional.of(metricContext.contextAwareMeter(ServiceMetricNames.SKIPPED_FLOWS));
+ }
+
+ /**
+ * Adds a new FlowGauge to the metric context if one does not already exist
for this flow spec
+ */
+ public void addFlowGauge(Spec spec, Config flowConfig, String flowName,
String flowGroup) {
+ // Only register the metric of flows that are scheduled, run once flows
should not be tracked indefinitely
+ if (!flowGauges.containsKey(spec.getUri().toString()) &&
flowConfig.hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ String flowCompiledGaugeName =
+ MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
flowGroup, flowName, ServiceMetricNames.COMPILED);
+ flowGauges.put(spec.getUri().toString(), new FlowCompiledState());
+ ContextAwareGauge<Integer> gauge =
+ RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName,
+ () -> flowGauges.get(spec.getUri().toString()).state.value);
+ RootMetricContext.get().register(flowCompiledGaugeName, gauge);
+ }
+ }
+ /**
+ * Updates the flowgauge related to the spec if the gauge is being tracked
for the flow
+ * @param spec FlowSpec to be updated
+ * @param state desired state to set the gauge
+ */
+ public void conditionallyUpdateFlowGaugeSpecState(Spec spec, CompiledState
state) {
+ if (flowGauges.containsKey(spec.getUri().toString())) {
+ flowGauges.get(spec.getUri().toString()).setState(state);
+ }
+ }
+
+ @Setter
+ public static class FlowCompiledState {
Review Comment:
nit: given this is mentioned above, I'd suggest introducing beforehand, by
moving this def to the top of the enclosing class
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+ SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler
specCompiler, UserQuotaManager quotaManager,
+ Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator
flowStatusGenerator, Logger log,
+ boolean flowConcurrencyFlag, FlowSpec flowSpec)
+ throws IOException, InterruptedException {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler,
quotaManager, eventSubmitter,
+ flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec,
flowName, flowGroup)) {
+ return Optional.absent();
+ }
+
+ //Wait for the SpecCompiler to become healthy.
+ specCompiler.awaitHealthy();
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
Review Comment:
is this different from what just happened inside
`isExecutionPermittedHandler`? if not, rather than repeat, let's return
`Optional<Dag<JobExecutionPlan>>` from the latter, rather than the boolean. if
there's a reason NOT to do that, then please add a comment here, describing the
pitfall, so a maintainer gets a clear idea of what to watch out, before
potentially trying that
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+ SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler
specCompiler, UserQuotaManager quotaManager,
+ Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator
flowStatusGenerator, Logger log,
+ boolean flowConcurrencyFlag, FlowSpec flowSpec)
+ throws IOException, InterruptedException {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler,
quotaManager, eventSubmitter,
+ flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec,
flowName, flowGroup)) {
+ return Optional.absent();
+ }
+
+ //Wait for the SpecCompiler to become healthy.
+ specCompiler.awaitHealthy();
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+ Optional<TimingEvent> flowCompilationTimer =
+ eventSubmitter.transform(submitter -> new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILED));
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec,
flowMetadata);
+ return Optional.absent();
+ }
+
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+ if (flowCompilationTimer.isPresent()) {
+ flowCompilationTimer.get().stop(flowMetadata);
+ }
+ return Optional.of(jobExecutionPlanDag);
+ }
+
+ /**
+ * Checks if flowSpec disallows concurrent executions, and if so then checks
if another instance of the flow is
+ * already running and emits a FLOW FAILED event. Otherwise, this check
passes.
+ * @return true if caller can proceed to execute flow, false otherwise
+ * @throws IOException
+ */
+ public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer
sharedFlowMetricsContainer,
Review Comment:
nit: a "handler" is usually a class or a func/method reference invoked as a
callback. this also isn't really a predicate (e.g. `isXYZ`), as it performs so
many side-effects, like quota mgmt, etc.
maybe `validateExecutionOrFinalizeRejection`? (sorry, you're likely to find
a better name)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
Review Comment:
`FlowCompilationValidationHelper`?
also, I completely agree w/ the statelessness (and `final`ity) of this
abstraction, but just for comprehensibility I'd suggest, creating a ctor merely
to separate "the args that stay the same" from "the changing ones", to put the
latter front and center.
e.g. so if not called like this:
```
this.fcHelper = new FlowCompilationValidationHelper(
sharedFlowMetricsContainer, specCompiler, quotaManager,
optEventSubmitter, flowStatusGenerator, log, isFlowConcurrencyEnabled);
... // way later on
Optional<Dag<JobExecutionPlan>> optDag =
fcHelper.createExecutionPlanIfValid(flowSpec);
```
then at least like this:
```
Optional<Dag<JobExecutionPlan>> optDag = new FlowCompilationValidationHelper(
sharedFlowMetricsContainer, specCompiler, quotaManager,
optEventSubmitter, flowStatusGenerator, log, isFlowConcurrencyEnabled)
.createExecutionPlanIfValid(flowSpec);
```
p.s. populating the compilation failure message could remain `static`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]