This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 913d580fe [GOBBLIN-1984] Add consensus flowExecutionId to FlowSpec to 
use for compilation (#3857)
913d580fe is described below

commit 913d580fe19e7a6adc60d3294bac44e5a32d7bba
Author: umustafi <[email protected]>
AuthorDate: Tue Jan 16 10:05:42 2024 -0800

    [GOBBLIN-1984] Add consensus flowExecutionId to FlowSpec to use for 
compilation (#3857)
    
    * Add consensus flowExecutionId to FlowSpec to use for compilation
    
    * Use FlowSpec instead of generic Spec type for fields
    
    * Address consistency issue between properties and remove redundant casting
    
    * Make volatile fields private
    
    * Add one more clarifying comment
    
    * Update comment
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../service/FlowConfigResourceLocalHandler.java    |  2 +-
 .../org/apache/gobblin/runtime/api/FlowSpec.java   | 37 +++++++++++-
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  |  6 +-
 .../apache/gobblin/runtime/api/FlowSpecTest.java   | 66 ++++++++++++++++++++++
 .../modules/orchestration/Orchestrator.java        |  9 ++-
 .../scheduler/GobblinServiceJobScheduler.java      |  2 +-
 .../utils/FlowCompilationValidationHelper.java     | 57 ++++++-------------
 .../monitoring/DagActionStoreChangeMonitor.java    | 11 +++-
 .../utils/FlowCompilationValidationHelperTest.java | 11 ++--
 9 files changed, 138 insertions(+), 63 deletions(-)

diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 6e7b9b082..fa0b2f46c 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -95,7 +95,7 @@ public class FlowConfigResourceLocalHandler implements 
FlowConfigsResourceHandle
 
     try {
       URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
-      FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+      FlowSpec spec = flowCatalog.getSpecs(flowUri);
       return FlowSpec.Utils.toFlowConfig(spec);
     } catch (URISyntaxException e) {
       throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad 
URI " + flowId.getFlowName(), e);
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index 4b77294e6..a74b9fd0e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Sets;
 import com.linkedin.data.template.StringMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -35,6 +36,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.extern.slf4j.Slf4j;
@@ -56,6 +58,7 @@ import org.apache.gobblin.util.ConfigUtils;
  *
  */
 @Alpha
+@AllArgsConstructor
 @Data
 @EqualsAndHashCode(exclude={"compilationErrors"})
 @SuppressFBWarnings(value="SE_BAD_FIELD",
@@ -75,13 +78,23 @@ public class FlowSpec implements Configurable, Spec {
   /** Human-readable description of the flow spec */
   final String description;
 
-  /** Flow config as a typesafe config object */
-  final Config config;
+  /* Note that since getConfig() and getConfigAsProperties() are independent 
accessors, `volatile` doesn't ensure a
+  * consistent view between them. If one wants to access both, they should 
briefly synchronize on the FlowSpec object
+  * while obtaining them:
+  *    FlowSpec fs = ...
+  *    synchronized (fs) {
+  *      fs.getConfig()
+  *      fs.getConfigAsProperties()
+  *    }
+  */
+
+  /** Flow config as a typesafe config object which can be replaced */
+  private volatile Config config;
 
   /** Flow config as a properties collection for backwards compatibility */
   // Note that this property is not strictly necessary as it can be generated 
from the typesafe
   // config. We use it as a cache until typesafe config is more widely adopted 
in Gobblin.
-  final Properties configAsProperties;
+  private volatile Properties configAsProperties;
 
   /** URI of {@link org.apache.gobblin.runtime.api.JobTemplate} to use. */
   final Optional<Set<URI>> templateURIs;
@@ -125,6 +138,24 @@ public class FlowSpec implements Configurable, Spec {
     }
   }
 
+  /**
+   * Add property to Config (also propagated to the Properties field). These 
two fields should only be modified through
+   * this method to prevent inconsistency between them.
+   * Note that when the property is being added, config and configAsProperties 
can have different values, but they will
+   * be consistent by the time method returns.
+   * @param key
+   * @param value
+   */
+  public synchronized void addProperty(String key, String value) {
+    this.config = config.withValue(key, ConfigValueFactory.fromAnyRef(value));
+    /* Make sure configAsProperties has been initialized. If it's just 
initialized, setting the property will be a
+    redundant operation. However, if it already existed we need to update/add 
the key-value pair.
+     */
+    this.getConfigAsProperties();
+    this.configAsProperties.setProperty(key, value);
+
+  }
+
   public void addCompilationError(String src, String dst, String errorMessage, 
int numberOfHops) {
     this.compilationErrors.add(new CompilationError(getConfig(), src, dst, 
errorMessage, numberOfHops));
   }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 0fe6f2583..67c49ff36 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -296,11 +296,11 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
   }
 
   @Override
-  public Spec getSpecs(URI uri) throws SpecNotFoundException {
+  public FlowSpec getSpecs(URI uri) throws SpecNotFoundException {
     try {
-      return specStore.getSpec(uri);
+      return (FlowSpec) specStore.getSpec(uri);
     } catch (IOException e) {
-      throw new RuntimeException("Cannot retrieve Spec from Spec store for 
URI: " + uri, e);
+      throw new RuntimeException("Cannot retrieve FlowSpec from FlowSpec store 
for URI: " + uri, e);
     }
   }
 
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
new file mode 100644
index 000000000..539f42d53
--- /dev/null
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import com.typesafe.config.Config;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.FlowId;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class FlowSpecTest {
+
+  /**
+   * Tests that the addProperty() function to ensure the new flowSpec returned 
has the original properties and updated
+   * ones
+   * @throws URISyntaxException
+   */
+  @Test
+  public void testAddProperty() throws URISyntaxException {
+    String flowGroup = "myGroup";
+    String flowName = "myName";
+    String flowExecutionId = "1234";
+    FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+    URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+
+    // Create properties to be used as config
+    Properties properties = new Properties();
+    properties.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
+    properties.setProperty(ConfigurationKeys.FLOW_NAME_KEY, flowName);
+    properties.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, 
"true");
+
+    FlowSpec flowSpec = 
FlowSpec.builder(flowUri).withConfigAsProperties(properties).build();
+    flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
flowExecutionId);
+
+    Properties updatedProperties = flowSpec.getConfigAsProperties();
+    
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
 flowExecutionId);
+    
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_GROUP_KEY),
 flowGroup);
+    
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_NAME_KEY),
 flowName);
+    
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY),
 "true");
+
+    Config updatedConfig = flowSpec.getConfig();
+    
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
 flowExecutionId);
+    
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), 
flowGroup);
+    
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), 
flowName);
+    
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY),
 "true");
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 85c8248fb..ad9956458 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -255,7 +255,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       } else {
         TimingEvent flowCompilationTimer = new 
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
         Optional<Dag<JobExecutionPlan>> compiledDagOptional =
-            
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 spec, flowGroup,
+            
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 flowSpec, flowGroup,
                 flowName);
 
         if (!compiledDagOptional.isPresent()) {
@@ -264,7 +264,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         }
         Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
         if (compiledDag.isEmpty()) {
-          
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
 spec, flowMetadata);
+          
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
 flowSpec, flowMetadata);
           Instrumented.markMeter(this.flowOrchestrationFailedMeter);
           
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
               SharedFlowMetricsSingleton.CompiledState.FAILED);
@@ -288,10 +288,9 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
-  public void submitFlowToDagManager(FlowSpec flowSpec, 
DagActionStore.DagAction flowAction) throws IOException, InterruptedException {
+  public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException, 
InterruptedException {
     Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
-        
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
-            Optional.of(flowAction.getFlowExecutionId()));
+        
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec);
     if (optionalJobExecutionPlanDag.isPresent()) {
       submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
     } else {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index cd1bd421f..1253858ea 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -642,7 +642,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
       this.lastUpdatedTimeForFlowSpec.remove(specURI.toString());
       unscheduleJob(specURI.toString());
       try {
-          FlowSpec spec = (FlowSpec) this.flowCatalog.get().getSpecs(specURI);
+          FlowSpec spec = this.flowCatalog.get().getSpecs(specURI);
           Properties properties = spec.getConfigAsProperties();
           _log.info(jobSchedulerTracePrefixBuilder(properties) + "Unscheduled 
Spec");
         } catch (SpecNotFoundException e) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 674ff0024..cda69ebd9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -28,7 +28,6 @@ import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
@@ -65,13 +64,10 @@ public final class FlowCompilationValidationHelper {
    * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
    * caller.
    * @param flowSpec
-   * @param optionalFlowExecutionId for scheduled (non-ad-hoc) flows, to pass 
the ID "laundered" via the DB;
-   *                                see: {@link 
org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter javadoc section 
titled
-   *                                `Database event_timestamp laundering`}
    * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
    */
-  public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec 
flowSpec,
-      Optional<String> optionalFlowExecutionId) throws IOException, 
InterruptedException {
+  public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec 
flowSpec)
+      throws IOException, InterruptedException {
     Config flowConfig = flowSpec.getConfig();
     String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
     String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
@@ -93,7 +89,7 @@ public final class FlowCompilationValidationHelper {
       return Optional.absent();
     }
 
-    addFlowExecutionIdIfAbsent(flowMetadata, optionalFlowExecutionId, 
jobExecutionPlanDagOptional.get());
+    addFlowExecutionIdIfAbsent(flowMetadata, 
jobExecutionPlanDagOptional.get());
     flowCompilationTimer.stop(flowMetadata);
     return jobExecutionPlanDagOptional;
   }
@@ -101,25 +97,25 @@ public final class FlowCompilationValidationHelper {
   /**
    * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
    * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
-   * @return Optional<Dag<JobExecutionPlan>> if caller allowed to execute flow 
and compile spec, else absent Optional
+   * @return Optional<Dag<JobExecutionPlan>> if caller allowed to execute flow 
and compile flowSpec, else Optional.absent()
    * @throws IOException
    */
-  public Optional<Dag<JobExecutionPlan>> 
validateAndHandleConcurrentExecution(Config flowConfig, Spec spec,
+  public Optional<Dag<JobExecutionPlan>> 
validateAndHandleConcurrentExecution(Config flowConfig, FlowSpec flowSpec,
       String flowGroup, String flowName) throws IOException {
     boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
         ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, 
isFlowConcurrencyEnabled);
 
-    Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
 
     if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, 
allowConcurrentExecution)) {
       return Optional.fromNullable(jobExecutionPlanDag);
     } else {
       log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
           + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
-      sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
+      
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(flowSpec,
           SharedFlowMetricsSingleton.CompiledState.SKIPPED);
       
Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter());
-      if (!isScheduledFlow((FlowSpec) spec)) {
+      if (!flowSpec.isScheduled()) {
         // For ad-hoc flow, we might already increase quota, we need to 
decrease here
         for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
           quotaManager.releaseQuota(dagNode);
@@ -127,9 +123,9 @@ public final class FlowCompilationValidationHelper {
       }
 
       // Send FLOW_FAILED event
-      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
       flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because 
another instance is running and concurrent "
-          + "executions are disabled. Set flow.allowConcurrentExecution to 
true in the flow spec to change this behaviour.");
+          + "executions are disabled. Set flow.allowConcurrentExecution to 
true in the flowSpec to change this behaviour.");
       new TimingEvent(eventSubmitter, 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
       return Optional.absent();
     }
@@ -150,11 +146,11 @@ public final class FlowCompilationValidationHelper {
 
   /**
    * Abstraction used to populate the message of and emit a FlowCompileFailed 
event for the Orchestrator.
-   * @param spec
+   * @param flowSpec
    * @param flowMetadata
    */
-  public static void populateFlowCompilationFailedEventMessage(EventSubmitter 
eventSubmitter, Spec spec,
-      Map<String, String> flowMetadata) {
+  public static void populateFlowCompilationFailedEventMessage(EventSubmitter 
eventSubmitter,
+      FlowSpec flowSpec, Map<String, String> flowMetadata) {
     // For scheduled flows, we do not insert the flowExecutionId into the 
FlowSpec. As a result, if the flow
     // compilation fails (i.e. we are unable to find a path), the metadata 
will not have flowExecutionId.
     // In this case, the current time is used as the flow executionId.
@@ -162,8 +158,8 @@ public final class FlowCompilationValidationHelper {
         Long.toString(System.currentTimeMillis()));
 
     String message = "Flow was not compiled successfully.";
-    if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
-      message = message + " Compilation errors encountered: " + ((FlowSpec) 
spec).getCompilationErrors();
+    if (!flowSpec.getCompilationErrors().isEmpty()) {
+      message = message + " Compilation errors encountered: " + 
flowSpec.getCompilationErrors();
     }
     flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
 
@@ -171,32 +167,13 @@ public final class FlowCompilationValidationHelper {
   }
 
   /**
-   * If it is a scheduled flow (which does not have flowExecutionId in the 
FlowSpec) and the flow compilation is
-   * successful, retrieve flowExecutionId from the JobSpec.
+   * If it is a scheduled flow run without multi-active scheduler 
configuration (where the FlowSpec does not have a
+   * flowExecutionId) and the flow compilation is successful, retrieve 
flowExecutionId from the JobSpec.
    */
   public static void addFlowExecutionIdIfAbsent(Map<String,String> 
flowMetadata,
       Dag<JobExecutionPlan> jobExecutionPlanDag) {
-    addFlowExecutionIdIfAbsent(flowMetadata, Optional.absent(), 
jobExecutionPlanDag);
-  }
-
-  /**
-   * If it is a scheduled flow (which does not have flowExecutionId in the 
FlowSpec) and the flow compilation is
-   * successful, add a flowExecutionId using the optional parameter if it 
exists otherwise retrieve it from the JobSpec.
-   */
-  public static void addFlowExecutionIdIfAbsent(Map<String,String> 
flowMetadata,
-      Optional<String> optionalFlowExecutionId, Dag<JobExecutionPlan> 
jobExecutionPlanDag) {
-    if (optionalFlowExecutionId.isPresent()) {
-      
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
 optionalFlowExecutionId.get());
-    }
     
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
         
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(
             ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
   }
-
-  /**
-   * Return true if the spec contains a schedule, false otherwise.
-   */
-  public static boolean isScheduledFlow(FlowSpec spec) {
-    return 
spec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY);
-  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 647bb8089..a6f11e08f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -34,6 +34,7 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.ContextAwareMeter;
@@ -275,9 +276,13 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     FlowSpec spec = null;
     try {
       URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
-      spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
-      // Pass flowExecutionId to DagManager to be used for scheduled flows 
that do not already contain a flowExecutionId
-      this.orchestrator.submitFlowToDagManager(spec, dagAction);
+      spec = flowCatalog.getSpecs(flowUri);
+      /* Update the spec to contain the flowExecutionId from the dagAction for 
scheduled flows that do not already
+      contain a flowExecutionId. Adhoc flowSpecs are already consistent with 
the dagAction so there's no effective
+      change. It's crucial to adopt the consensus flowExecutionId here to 
prevent creating a new one during compilation.
+      */
+      spec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+      this.orchestrator.submitFlowToDagManager(spec);
     } catch (URISyntaxException e) {
       log.warn("Could not create URI object for flowId {}. Exception {}", 
flowId, e.getMessage());
       launchSubmissionMetricProxy.markFailure();
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java
index 5f778f3f8..37ab711f0 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.service.modules.utils;
 
-import com.google.common.base.Optional;
 import java.net.URISyntaxException;
 import java.util.HashMap;
 import org.apache.gobblin.metrics.event.TimingEvent;
@@ -35,7 +34,6 @@ import org.testng.annotations.Test;
 public class FlowCompilationValidationHelperTest {
   private String dagId = "testDag";
   private Long jobSpecFlowExecutionId = 1234L;
-  private String newFlowExecutionId = "5678";
   private String existingFlowExecutionId = "9999";
   private Dag<JobExecutionPlan> jobExecutionPlanDag;
 
@@ -46,14 +44,13 @@ public class FlowCompilationValidationHelperTest {
   }
 
   /*
-    Tests that addFlowExecutionIdIfAbsent adds flowExecutionId to a 
flowMetadata object when it is absent, prioritizing
-    the optional flowExecutionId over the one from the job spec
+    Tests that addFlowExecutionIdIfAbsent adds the jobSpec flowExecutionId to 
a flowMetadata object when it is absent
    */
   @Test
   public void testAddFlowExecutionIdWhenAbsent() {
     HashMap<String, String> flowMetadata = new HashMap<>();
-    FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
Optional.of(newFlowExecutionId), jobExecutionPlanDag);
-    
Assert.assertEquals(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD),
 newFlowExecutionId);
+    FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
jobExecutionPlanDag);
+    
Assert.assertEquals(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD),
 String.valueOf(jobSpecFlowExecutionId));
   }
 
   /*
@@ -63,7 +60,7 @@ public class FlowCompilationValidationHelperTest {
   public void testSkipAddingFlowExecutionIdWhenPresent() {
     HashMap<String, String> flowMetadata = new HashMap<>();
     flowMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
existingFlowExecutionId);
-    FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
Optional.of(newFlowExecutionId), jobExecutionPlanDag);
+    
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,jobExecutionPlanDag);
     
Assert.assertEquals(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD),
 existingFlowExecutionId);
   }
 }

Reply via email to