phet commented on code in PR #3731:
URL: https://github.com/apache/gobblin/pull/3731#discussion_r1284899863


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Helper class with functionality meant to be re-used between the DagManager 
and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateful,
+ * requiring all stateful pieces to be passed as input from the caller upon 
instantiating the helper.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development, so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+@Slf4j
+@AllArgsConstructor
+public final class FlowCompilationValidationHelper {
+  private SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
+  private SpecCompiler specCompiler;
+  private UserQuotaManager quotaManager;
+  private Optional<EventSubmitter> eventSubmitter;
+  private FlowStatusGenerator flowStatusGenerator;
+  private boolean isFlowConcurrencyEnabled;
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec 
flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
+        validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, 
flowName);
+    if (!jobExecutionPlanDagOptional.isPresent()) {
+      return Optional.absent();
+    }
+
+    Optional<TimingEvent> flowCompilationTimer =
+        eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+
+    if (jobExecutionPlanDagOptional.get() == null || 
jobExecutionPlanDagOptional.get().isEmpty()) {
+      populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
+      return Optional.absent();
+    }
+
+    addFlowExecutionIdIfAbsent(flowMetadata, 
jobExecutionPlanDagOptional.get());
+    if (flowCompilationTimer.isPresent()) {
+      flowCompilationTimer.get().stop(flowMetadata);
+    }

Review Comment:
   is this timing still meaningful, given that 
`validateAndHandleConcurrentExecution` was called before even starting it?  
also, is it OK that it's never stopped in the error case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to