umustafi commented on code in PR #3731:
URL: https://github.com/apache/gobblin/pull/3731#discussion_r1284875644


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+      SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler 
specCompiler, UserQuotaManager quotaManager,
+      Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator 
flowStatusGenerator, Logger log,
+      boolean flowConcurrencyFlag, FlowSpec flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, 
quotaManager, eventSubmitter,
+        flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec, 
flowName, flowGroup)) {
+      return Optional.absent();
+    }
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    Optional<TimingEvent> flowCompilationTimer =
+        eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
+      return Optional.absent();
+    }
+
+    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+    if (flowCompilationTimer.isPresent()) {
+      flowCompilationTimer.get().stop(flowMetadata);
+    }
+    return Optional.of(jobExecutionPlanDag);
+  }
+
+  /**
+   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
+   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
+   * @return true if caller can proceed to execute flow, false otherwise
+   * @throws IOException
+   */
+  public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer 
sharedFlowMetricsContainer,

Review Comment:
   renamed to `validateAndHandleConcurrentExecution` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to