[
https://issues.apache.org/jira/browse/GOBBLIN-1868?focusedWorklogId=874724&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-874724
]
ASF GitHub Bot logged work on GOBBLIN-1868:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 04/Aug/23 19:51
Start Date: 04/Aug/23 19:51
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3731:
URL: https://github.com/apache/gobblin/pull/3731#discussion_r1284711212
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -240,7 +252,26 @@ public DagManager(Config config, JobStatusRetriever
jobStatusRetriever, Orchestr
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config,
JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis =
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
this.jobStatusRetriever = jobStatusRetriever;
- this.orchestrator = orchestrator;
+ this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
+ this.flowStatusGenerator = flowStatusGenerator;
+ try {
+ String specCompilerClassName =
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
+ if
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
+ specCompilerClassName =
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
Review Comment:
I prefer the use of `ConfigUtils.getString` below, which takes a default, so
all this can happen in a single line, w/o needing an `if` block.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -240,7 +252,26 @@ public DagManager(Config config, JobStatusRetriever
jobStatusRetriever, Orchestr
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config,
JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis =
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
this.jobStatusRetriever = jobStatusRetriever;
- this.orchestrator = orchestrator;
+ this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
+ this.flowStatusGenerator = flowStatusGenerator;
+ try {
+ String specCompilerClassName =
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
+ if
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
+ specCompilerClassName =
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
+ }
+ log.info("Using specCompiler class name/alias " + specCompilerClassName);
+
+ this.specCompiler = (SpecCompiler)
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)),
config);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException |
+ ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ this.flowConcurrencyFlag = ConfigUtils.getBoolean(config,
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
+ ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
Review Comment:
I'm unclear what's meant by "flow concurrency". AFAIR "allow concurrent
executions" is flow-scoped, rather than system-level, ain't it?
also, the name `flowConcurrencyFlag` is less preferable to precisely stating
the meaning/sense of the flag, like `isFlowConcurrencyEnabled` or
`flowConcurrencyDisabled`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator}
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsContainer {
+ protected final MetricContext metricContext;
+ private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
+ private Optional<Meter> skippedFlowsMeter;
+
+ @Inject
+ public SharedFlowMetricsContainer(Config config) {
+ this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(config),
SharedFlowMetricsContainer.class);
+ this.skippedFlowsMeter =
Optional.of(metricContext.contextAwareMeter(ServiceMetricNames.SKIPPED_FLOWS));
+ }
+
+ /**
+ * Adds a new FlowGauge to the metric context if one does not already exist
for this flow spec
+ */
+ public void addFlowGauge(Spec spec, Config flowConfig, String flowName,
String flowGroup) {
Review Comment:
I find widely-employed, non-semantic types like `String` error-prone,
because particular instances are readily confused w/ one another, such as w/
param/arg mis-ordering. since `flowGroup` is less-specific than `flowName`, my
habit is to put it first, which appears to be how `MetricRegistry.name` wants
them as well... and yet they're swapped in this method's params.
let's seek uniform consistency, so a misordering easily jumps out at
maintainers. nonetheless, even better, would be to replace the two `String`
params w/ a single `FlowId`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+ SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler
specCompiler, UserQuotaManager quotaManager,
+ Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator
flowStatusGenerator, Logger log,
+ boolean flowConcurrencyFlag, FlowSpec flowSpec)
+ throws IOException, InterruptedException {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler,
quotaManager, eventSubmitter,
+ flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec,
flowName, flowGroup)) {
+ return Optional.absent();
+ }
+
+ //Wait for the SpecCompiler to become healthy.
+ specCompiler.awaitHealthy();
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+ Optional<TimingEvent> flowCompilationTimer =
+ eventSubmitter.transform(submitter -> new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILED));
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec,
flowMetadata);
+ return Optional.absent();
+ }
+
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+ if (flowCompilationTimer.isPresent()) {
+ flowCompilationTimer.get().stop(flowMetadata);
+ }
+ return Optional.of(jobExecutionPlanDag);
+ }
+
+ /**
+ * Checks if flowSpec disallows concurrent executions, and if so then checks
if another instance of the flow is
+ * already running and emits a FLOW FAILED event. Otherwise, this check
passes.
+ * @return true if caller can proceed to execute flow, false otherwise
+ * @throws IOException
+ */
+ public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer
sharedFlowMetricsContainer,
+ SpecCompiler specCompiler, UserQuotaManager quotaManager,
Optional<EventSubmitter> eventSubmitter,
+ FlowStatusGenerator flowStatusGenerator, Logger log, boolean
flowConcurrencyFlag, Config flowConfig, Spec spec,
+ String flowName, String flowGroup) throws IOException {
Review Comment:
I commented elsewhere about lack of uniform ordering between `flowName` and
`flowGroup` params... as well as way to sidestep via `FlowId` type
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+ SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler
specCompiler, UserQuotaManager quotaManager,
+ Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator
flowStatusGenerator, Logger log,
+ boolean flowConcurrencyFlag, FlowSpec flowSpec)
+ throws IOException, InterruptedException {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler,
quotaManager, eventSubmitter,
+ flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec,
flowName, flowGroup)) {
+ return Optional.absent();
+ }
+
+ //Wait for the SpecCompiler to become healthy.
+ specCompiler.awaitHealthy();
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+ Optional<TimingEvent> flowCompilationTimer =
+ eventSubmitter.transform(submitter -> new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILED));
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec,
flowMetadata);
+ return Optional.absent();
+ }
+
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+ if (flowCompilationTimer.isPresent()) {
+ flowCompilationTimer.get().stop(flowMetadata);
+ }
+ return Optional.of(jobExecutionPlanDag);
+ }
+
+ /**
+ * Checks if flowSpec disallows concurrent executions, and if so then checks
if another instance of the flow is
+ * already running and emits a FLOW FAILED event. Otherwise, this check
passes.
+ * @return true if caller can proceed to execute flow, false otherwise
+ * @throws IOException
+ */
+ public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer
sharedFlowMetricsContainer,
+ SpecCompiler specCompiler, UserQuotaManager quotaManager,
Optional<EventSubmitter> eventSubmitter,
+ FlowStatusGenerator flowStatusGenerator, Logger log, boolean
flowConcurrencyFlag, Config flowConfig, Spec spec,
+ String flowName, String flowGroup) throws IOException {
+ boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+ ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
flowConcurrencyFlag);
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+ if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup,
allowConcurrentExecution)) {
+ log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
+ sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec,
SharedFlowMetricsContainer.CompiledState.SKIPPED);
+
Instrumented.markMeter(sharedFlowMetricsContainer.getSkippedFlowsMeter());
+ if (!((FlowSpec)
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ // For ad-hoc flow, we might already increase quota, we need to
decrease here
+ for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+ quotaManager.releaseQuota(dagNode);
+ }
+ }
+
+ // Send FLOW_FAILED event
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because
another instance is running and concurrent "
+ + "executions are disabled. Set flow.allowConcurrentExecution to
true in the flow spec to change this behaviour.");
+ if (eventSubmitter.isPresent()) {
+ new TimingEvent(eventSubmitter.get(),
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+ }
+ return false;
+ }
+ return true;
Review Comment:
not sure where we are in the nesting, so that I've got this right... *but
that's the problem I'm suggesting to solve here!* ;)
how about:
```
if (isExecPermitted(...)) {
return true;
} else {
... // all the many other lines of code
}
```
?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
Review Comment:
perhaps more specific... `compileToExecutionPlan`?
`createExecutionPlanIfValid`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -206,7 +212,12 @@ public String toString() {
protected final Long defaultJobStartSlaTimeMillis;
@Getter
private final JobStatusRetriever jobStatusRetriever;
- private final Orchestrator orchestrator;
+ private final FlowStatusGenerator flowStatusGenerator;
+ private final UserQuotaManager quotaManager;
+ private final ClassAliasResolver<SpecCompiler> aliasResolver;
Review Comment:
is this used outside the ctor? if not, why an instance member?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator}
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsContainer {
Review Comment:
naming-wise: "container" has very specific meaning: an exec env that hosts
user-supplied code. e.g. "my app runs in a container," or "container
deployment failed." more rarely, it's a synonym for "collection", especially
the abstract concept. e.g. "Map, List, and PriorityQueue are Java container
types."
this class, looks like simply `SharedFlowMetrics` or the
`SharedFlowMetricsSingleton` or a `SharedFlowMetricsTracker`...
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -240,7 +252,26 @@ public DagManager(Config config, JobStatusRetriever
jobStatusRetriever, Orchestr
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config,
JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis =
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
this.jobStatusRetriever = jobStatusRetriever;
- this.orchestrator = orchestrator;
+ this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
+ this.flowStatusGenerator = flowStatusGenerator;
+ try {
+ String specCompilerClassName =
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
+ if
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
+ specCompilerClassName =
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
+ }
+ log.info("Using specCompiler class name/alias " + specCompilerClassName);
+
+ this.specCompiler = (SpecCompiler)
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)),
config);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException |
+ ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
Review Comment:
this ctor is 46 lines long now, with 13 involving initializing this
`SpecCompiler`. let's streamline and minimize the distraction by abstracting
it into a one-line method call, like:
```
this.specCompiler = createSpecCompiler(config);
```
alternatively, is there a way to use the `GobblinConstructorUtils`, like you
did below for the `UserQuotaManager`? e.g. -
https://github.com/apache/gobblin/blob/2917b6311949d25f898a6691fe0730d344299d25/gobblin-utility/src/main/java/org/apache/gobblin/util/reflection/GobblinConstructorUtils.java#L110
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator}
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsContainer {
+ protected final MetricContext metricContext;
+ private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
Review Comment:
everywhere below, I see `spec.getUri().toString()`... can't we simply use a
URI as this map's key, as doing so additionally carries richer semantics
also: up to you, but my naming convention here would be
`flowGaugeStateBySpecUri`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+ SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler
specCompiler, UserQuotaManager quotaManager,
+ Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator
flowStatusGenerator, Logger log,
+ boolean flowConcurrencyFlag, FlowSpec flowSpec)
+ throws IOException, InterruptedException {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler,
quotaManager, eventSubmitter,
+ flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec,
flowName, flowGroup)) {
+ return Optional.absent();
+ }
+
+ //Wait for the SpecCompiler to become healthy.
+ specCompiler.awaitHealthy();
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+ Optional<TimingEvent> flowCompilationTimer =
+ eventSubmitter.transform(submitter -> new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILED));
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec,
flowMetadata);
+ return Optional.absent();
+ }
+
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+ if (flowCompilationTimer.isPresent()) {
+ flowCompilationTimer.get().stop(flowMetadata);
+ }
+ return Optional.of(jobExecutionPlanDag);
+ }
+
+ /**
+ * Checks if flowSpec disallows concurrent executions, and if so then checks
if another instance of the flow is
+ * already running and emits a FLOW FAILED event. Otherwise, this check
passes.
+ * @return true if caller can proceed to execute flow, false otherwise
+ * @throws IOException
+ */
+ public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer
sharedFlowMetricsContainer,
+ SpecCompiler specCompiler, UserQuotaManager quotaManager,
Optional<EventSubmitter> eventSubmitter,
+ FlowStatusGenerator flowStatusGenerator, Logger log, boolean
flowConcurrencyFlag, Config flowConfig, Spec spec,
+ String flowName, String flowGroup) throws IOException {
+ boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+ ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
flowConcurrencyFlag);
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+ if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup,
allowConcurrentExecution)) {
+ log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
+ sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec,
SharedFlowMetricsContainer.CompiledState.SKIPPED);
+
Instrumented.markMeter(sharedFlowMetricsContainer.getSkippedFlowsMeter());
+ if (!((FlowSpec)
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
Review Comment:
NBD... but I do imagine it being clearer to read:
```
if (isScheduledFlow(spec)) ...
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+ SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler
specCompiler, UserQuotaManager quotaManager,
+ Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator
flowStatusGenerator, Logger log,
+ boolean flowConcurrencyFlag, FlowSpec flowSpec)
+ throws IOException, InterruptedException {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler,
quotaManager, eventSubmitter,
+ flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec,
flowName, flowGroup)) {
+ return Optional.absent();
+ }
+
+ //Wait for the SpecCompiler to become healthy.
+ specCompiler.awaitHealthy();
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+ Optional<TimingEvent> flowCompilationTimer =
+ eventSubmitter.transform(submitter -> new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILED));
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec,
flowMetadata);
+ return Optional.absent();
+ }
+
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+ if (flowCompilationTimer.isPresent()) {
+ flowCompilationTimer.get().stop(flowMetadata);
+ }
+ return Optional.of(jobExecutionPlanDag);
+ }
+
+ /**
+ * Checks if flowSpec disallows concurrent executions, and if so then checks
if another instance of the flow is
+ * already running and emits a FLOW FAILED event. Otherwise, this check
passes.
+ * @return true if caller can proceed to execute flow, false otherwise
+ * @throws IOException
+ */
+ public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer
sharedFlowMetricsContainer,
+ SpecCompiler specCompiler, UserQuotaManager quotaManager,
Optional<EventSubmitter> eventSubmitter,
+ FlowStatusGenerator flowStatusGenerator, Logger log, boolean
flowConcurrencyFlag, Config flowConfig, Spec spec,
+ String flowName, String flowGroup) throws IOException {
+ boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+ ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
flowConcurrencyFlag);
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+ if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup,
allowConcurrentExecution)) {
+ log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
Review Comment:
(I notice we log them as group, then name--so that may be what maintainers
become most familiar w/)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+ SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler
specCompiler, UserQuotaManager quotaManager,
+ Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator
flowStatusGenerator, Logger log,
+ boolean flowConcurrencyFlag, FlowSpec flowSpec)
+ throws IOException, InterruptedException {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler,
quotaManager, eventSubmitter,
+ flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec,
flowName, flowGroup)) {
+ return Optional.absent();
+ }
+
+ //Wait for the SpecCompiler to become healthy.
+ specCompiler.awaitHealthy();
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+ Optional<TimingEvent> flowCompilationTimer =
+ eventSubmitter.transform(submitter -> new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILED));
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec,
flowMetadata);
+ return Optional.absent();
+ }
+
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+ if (flowCompilationTimer.isPresent()) {
+ flowCompilationTimer.get().stop(flowMetadata);
+ }
+ return Optional.of(jobExecutionPlanDag);
+ }
+
+ /**
+ * Checks if flowSpec disallows concurrent executions, and if so then checks
if another instance of the flow is
+ * already running and emits a FLOW FAILED event. Otherwise, this check
passes.
+ * @return true if caller can proceed to execute flow, false otherwise
+ * @throws IOException
+ */
+ public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer
sharedFlowMetricsContainer,
+ SpecCompiler specCompiler, UserQuotaManager quotaManager,
Optional<EventSubmitter> eventSubmitter,
+ FlowStatusGenerator flowStatusGenerator, Logger log, boolean
flowConcurrencyFlag, Config flowConfig, Spec spec,
+ String flowName, String flowGroup) throws IOException {
+ boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+ ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
flowConcurrencyFlag);
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+ if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup,
allowConcurrentExecution)) {
+ log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
+ sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec,
SharedFlowMetricsContainer.CompiledState.SKIPPED);
+
Instrumented.markMeter(sharedFlowMetricsContainer.getSkippedFlowsMeter());
+ if (!((FlowSpec)
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ // For ad-hoc flow, we might already increase quota, we need to
decrease here
+ for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+ quotaManager.releaseQuota(dagNode);
+ }
+ }
+
+ // Send FLOW_FAILED event
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because
another instance is running and concurrent "
+ + "executions are disabled. Set flow.allowConcurrentExecution to
true in the flow spec to change this behaviour.");
+ if (eventSubmitter.isPresent()) {
+ new TimingEvent(eventSubmitter.get(),
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Check if a FlowSpec instance is allowed to run.
+ *
+ * @param flowName
+ * @param flowGroup
+ * @param allowConcurrentExecution
+ * @return true if the {@link FlowSpec} allows concurrent executions or if
no other instance of the flow is currently RUNNING.
+ */
+ private static boolean isExecutionPermitted(FlowStatusGenerator
flowStatusGenerator, String flowName,
+ String flowGroup, boolean allowConcurrentExecution) {
+ if (allowConcurrentExecution) {
+ return true;
+ } else {
+ return !flowStatusGenerator.isFlowRunning(flowName, flowGroup);
+ }
Review Comment:
nit:
```
return allowConcurrent || !fsg.isFlowRunning(...)
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator}
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsContainer {
+ protected final MetricContext metricContext;
+ private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
+ private Optional<Meter> skippedFlowsMeter;
+
+ @Inject
+ public SharedFlowMetricsContainer(Config config) {
+ this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(config),
SharedFlowMetricsContainer.class);
+ this.skippedFlowsMeter =
Optional.of(metricContext.contextAwareMeter(ServiceMetricNames.SKIPPED_FLOWS));
+ }
+
+ /**
+ * Adds a new FlowGauge to the metric context if one does not already exist
for this flow spec
+ */
+ public void addFlowGauge(Spec spec, Config flowConfig, String flowName,
String flowGroup) {
+ // Only register the metric of flows that are scheduled, run once flows
should not be tracked indefinitely
+ if (!flowGauges.containsKey(spec.getUri().toString()) &&
flowConfig.hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ String flowCompiledGaugeName =
+ MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
flowGroup, flowName, ServiceMetricNames.COMPILED);
+ flowGauges.put(spec.getUri().toString(), new FlowCompiledState());
+ ContextAwareGauge<Integer> gauge =
+ RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName,
+ () -> flowGauges.get(spec.getUri().toString()).state.value);
+ RootMetricContext.get().register(flowCompiledGaugeName, gauge);
+ }
+ }
+ /**
+ * Updates the flowgauge related to the spec if the gauge is being tracked
for the flow
+ * @param spec FlowSpec to be updated
+ * @param state desired state to set the gauge
+ */
+ public void conditionallyUpdateFlowGaugeSpecState(Spec spec, CompiledState
state) {
+ if (flowGauges.containsKey(spec.getUri().toString())) {
+ flowGauges.get(spec.getUri().toString()).setState(state);
+ }
+ }
+
+ @Setter
+ public static class FlowCompiledState {
Review Comment:
nit: given this is mentioned above, I'd suggest introducing beforehand, by
moving this def to the top of the enclosing class
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+ SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler
specCompiler, UserQuotaManager quotaManager,
+ Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator
flowStatusGenerator, Logger log,
+ boolean flowConcurrencyFlag, FlowSpec flowSpec)
+ throws IOException, InterruptedException {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler,
quotaManager, eventSubmitter,
+ flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec,
flowName, flowGroup)) {
+ return Optional.absent();
+ }
+
+ //Wait for the SpecCompiler to become healthy.
+ specCompiler.awaitHealthy();
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
Review Comment:
is this different from what just happened inside
`isExecutionPermittedHandler`? if not, rather than repeat, let's return
`Optional<Dag<JobExecutionPlan>>` from the latter, rather than the boolean. if
there's a reason NOT to do that, then please add a comment here, describing the
pitfall, so a maintainer gets a clear idea of what to watch out, before
potentially trying that
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+ SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler
specCompiler, UserQuotaManager quotaManager,
+ Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator
flowStatusGenerator, Logger log,
+ boolean flowConcurrencyFlag, FlowSpec flowSpec)
+ throws IOException, InterruptedException {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler,
quotaManager, eventSubmitter,
+ flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec,
flowName, flowGroup)) {
+ return Optional.absent();
+ }
+
+ //Wait for the SpecCompiler to become healthy.
+ specCompiler.awaitHealthy();
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+ Optional<TimingEvent> flowCompilationTimer =
+ eventSubmitter.transform(submitter -> new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILED));
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec,
flowMetadata);
+ return Optional.absent();
+ }
+
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+ if (flowCompilationTimer.isPresent()) {
+ flowCompilationTimer.get().stop(flowMetadata);
+ }
+ return Optional.of(jobExecutionPlanDag);
+ }
+
+ /**
+ * Checks if flowSpec disallows concurrent executions, and if so then checks
if another instance of the flow is
+ * already running and emits a FLOW FAILED event. Otherwise, this check
passes.
+ * @return true if caller can proceed to execute flow, false otherwise
+ * @throws IOException
+ */
+ public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer
sharedFlowMetricsContainer,
Review Comment:
nit: a "handler" is usually a class or a func/method reference invoked as a
callback. this also isn't really a predicate (e.g. `isXYZ`), as it performs so
many side-effects, like quota mgmt, etc.
maybe `validateExecutionOrFinalizeRejection`? (sorry, you're likely to find
a better name)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
Review Comment:
`FlowCompilationValidationHelper`?
also, I completely agree w/ the statelessness (and `final`ity) of this
abstraction, but just for comprehensibility I'd suggest, creating a ctor merely
to separate "the args that stay the same" from "the changing ones", to put the
latter front and center.
e.g. so if not called like this:
```
this.fcHelper = new FlowCompilationValidationHelper(
sharedFlowMetricsContainer, specCompiler, quotaManager,
optEventSubmitter, flowStatusGenerator, log, isFlowConcurrencyEnabled);
... // way later on
Optional<Dag<JobExecutionPlan>> optDag =
fcHelper.createExecutionPlanIfValid(flowSpec);
```
then at least like this:
```
Optional<Dag<JobExecutionPlan>> optDag = new FlowCompilationValidationHelper(
sharedFlowMetricsContainer, specCompiler, quotaManager,
optEventSubmitter, flowStatusGenerator, log, isFlowConcurrencyEnabled)
.createExecutionPlanIfValid(flowSpec);
```
p.s. populating the compilation failure message could remain `static`
Issue Time Tracking
-------------------
Worklog Id: (was: 874724)
Time Spent: 0.5h (was: 20m)
> Refactor Common Utils between Orchestrator & DagManager
> -------------------------------------------------------
>
> Key: GOBBLIN-1868
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1868
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Create a Util class to contain functionality re-used between the DagManager
> and Orchestrator when launching executions of a flow spec. In the common
> case, the Orchestrator receives a flow to orchestrate, performs necessary
> validations, and forwards the execution responsibility to the DagManager. The
> DagManager's responsibility is to carry out any flow action requests.
> However, with launch executions now being stored in the DagActionStateStore,
> on restart or leadership change the DagManager has to perform validations
> before executing any launch actions the previous leader was unable to
> complete. Rather than duplicating the code or introducing a circular
> dependency between the DagManager and Orchestrator, this class is utilized to
> store the common functionality. It is stateless and requires all stateful
> pieces to be passed as input from the caller.
> * Note: We expect further refactoring to be done to the DagManager in later
> stage of multi-active development so we do not attempt *major* reorganization
> as abstractions may change.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)