This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 913d580fe [GOBBLIN-1984] Add consensus flowExecutionId to FlowSpec to
use for compilation (#3857)
913d580fe is described below
commit 913d580fe19e7a6adc60d3294bac44e5a32d7bba
Author: umustafi <[email protected]>
AuthorDate: Tue Jan 16 10:05:42 2024 -0800
[GOBBLIN-1984] Add consensus flowExecutionId to FlowSpec to use for
compilation (#3857)
* Add consensus flowExecutionId to FlowSpec to use for compilation
* Use FlowSpec instead of generic Spec type for fields
* Address consistency issue between properties and remove redundant casting
* Make volatile fields private
* Add one more clarifying comment
* Update comment
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../service/FlowConfigResourceLocalHandler.java | 2 +-
.../org/apache/gobblin/runtime/api/FlowSpec.java | 37 +++++++++++-
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 6 +-
.../apache/gobblin/runtime/api/FlowSpecTest.java | 66 ++++++++++++++++++++++
.../modules/orchestration/Orchestrator.java | 9 ++-
.../scheduler/GobblinServiceJobScheduler.java | 2 +-
.../utils/FlowCompilationValidationHelper.java | 57 ++++++-------------
.../monitoring/DagActionStoreChangeMonitor.java | 11 +++-
.../utils/FlowCompilationValidationHelperTest.java | 11 ++--
9 files changed, 138 insertions(+), 63 deletions(-)
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 6e7b9b082..fa0b2f46c 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -95,7 +95,7 @@ public class FlowConfigResourceLocalHandler implements
FlowConfigsResourceHandle
try {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
- FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+ FlowSpec spec = flowCatalog.getSpecs(flowUri);
return FlowSpec.Utils.toFlowConfig(spec);
} catch (URISyntaxException e) {
throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad
URI " + flowId.getFlowName(), e);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index 4b77294e6..a74b9fd0e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Sets;
import com.linkedin.data.template.StringMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.net.URI;
import java.net.URISyntaxException;
@@ -35,6 +36,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Set;
+import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
@@ -56,6 +58,7 @@ import org.apache.gobblin.util.ConfigUtils;
*
*/
@Alpha
+@AllArgsConstructor
@Data
@EqualsAndHashCode(exclude={"compilationErrors"})
@SuppressFBWarnings(value="SE_BAD_FIELD",
@@ -75,13 +78,23 @@ public class FlowSpec implements Configurable, Spec {
/** Human-readable description of the flow spec */
final String description;
- /** Flow config as a typesafe config object */
- final Config config;
+ /* Note that since getConfig() and getConfigAsProperties() are independent
accessors, `volatile` doesn't ensure a
+ * consistent view between them. If one wants to access both, they should
briefly synchronize on the FlowSpec object
+ * while obtaining them:
+ * FlowSpec fs = ...
+ * synchronized (fs) {
+ * fs.getConfig()
+ * fs.getConfigAsProperties()
+ * }
+ */
+
+ /** Flow config as a typesafe config object which can be replaced */
+ private volatile Config config;
/** Flow config as a properties collection for backwards compatibility */
// Note that this property is not strictly necessary as it can be generated
from the typesafe
// config. We use it as a cache until typesafe config is more widely adopted
in Gobblin.
- final Properties configAsProperties;
+ private volatile Properties configAsProperties;
/** URI of {@link org.apache.gobblin.runtime.api.JobTemplate} to use. */
final Optional<Set<URI>> templateURIs;
@@ -125,6 +138,24 @@ public class FlowSpec implements Configurable, Spec {
}
}
+ /**
+ * Add property to Config (also propagated to the Properties field). These
two fields should only be modified through
+ * this method to prevent inconsistency between them.
+ * Note that when the property is being added, config and configAsProperties
can have different values, but they will
+ * be consistent by the time method returns.
+ * @param key
+ * @param value
+ */
+ public synchronized void addProperty(String key, String value) {
+ this.config = config.withValue(key, ConfigValueFactory.fromAnyRef(value));
+ /* Make sure configAsProperties has been initialized. If it's just
initialized, setting the property will be a
+ redundant operation. However, if it already existed we need to update/add
the key-value pair.
+ */
+ this.getConfigAsProperties();
+ this.configAsProperties.setProperty(key, value);
+
+ }
+
public void addCompilationError(String src, String dst, String errorMessage,
int numberOfHops) {
this.compilationErrors.add(new CompilationError(getConfig(), src, dst,
errorMessage, numberOfHops));
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 0fe6f2583..67c49ff36 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -296,11 +296,11 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
}
@Override
- public Spec getSpecs(URI uri) throws SpecNotFoundException {
+ public FlowSpec getSpecs(URI uri) throws SpecNotFoundException {
try {
- return specStore.getSpec(uri);
+ return (FlowSpec) specStore.getSpec(uri);
} catch (IOException e) {
- throw new RuntimeException("Cannot retrieve Spec from Spec store for
URI: " + uri, e);
+ throw new RuntimeException("Cannot retrieve FlowSpec from FlowSpec store
for URI: " + uri, e);
}
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
new file mode 100644
index 000000000..539f42d53
--- /dev/null
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import com.typesafe.config.Config;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.FlowId;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class FlowSpecTest {
+
+ /**
+ * Tests that the addProperty() function to ensure the new flowSpec returned
has the original properties and updated
+ * ones
+ * @throws URISyntaxException
+ */
+ @Test
+ public void testAddProperty() throws URISyntaxException {
+ String flowGroup = "myGroup";
+ String flowName = "myName";
+ String flowExecutionId = "1234";
+ FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+
+ // Create properties to be used as config
+ Properties properties = new Properties();
+ properties.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
+ properties.setProperty(ConfigurationKeys.FLOW_NAME_KEY, flowName);
+ properties.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
"true");
+
+ FlowSpec flowSpec =
FlowSpec.builder(flowUri).withConfigAsProperties(properties).build();
+ flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
flowExecutionId);
+
+ Properties updatedProperties = flowSpec.getConfigAsProperties();
+
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
flowExecutionId);
+
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_GROUP_KEY),
flowGroup);
+
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_NAME_KEY),
flowName);
+
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY),
"true");
+
+ Config updatedConfig = flowSpec.getConfig();
+
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
flowExecutionId);
+
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY),
flowGroup);
+
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_NAME_KEY),
flowName);
+
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY),
"true");
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 85c8248fb..ad9956458 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -255,7 +255,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
} else {
TimingEvent flowCompilationTimer = new
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
Optional<Dag<JobExecutionPlan>> compiledDagOptional =
-
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
spec, flowGroup,
+
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
flowSpec, flowGroup,
flowName);
if (!compiledDagOptional.isPresent()) {
@@ -264,7 +264,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
}
Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
if (compiledDag.isEmpty()) {
-
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
spec, flowMetadata);
+
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
flowSpec, flowMetadata);
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
SharedFlowMetricsSingleton.CompiledState.FAILED);
@@ -288,10 +288,9 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
}
- public void submitFlowToDagManager(FlowSpec flowSpec,
DagActionStore.DagAction flowAction) throws IOException, InterruptedException {
+ public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException,
InterruptedException {
Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
-
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
- Optional.of(flowAction.getFlowExecutionId()));
+
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec);
if (optionalJobExecutionPlanDag.isPresent()) {
submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
} else {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index cd1bd421f..1253858ea 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -642,7 +642,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
this.lastUpdatedTimeForFlowSpec.remove(specURI.toString());
unscheduleJob(specURI.toString());
try {
- FlowSpec spec = (FlowSpec) this.flowCatalog.get().getSpecs(specURI);
+ FlowSpec spec = this.flowCatalog.get().getSpecs(specURI);
Properties properties = spec.getConfigAsProperties();
_log.info(jobSchedulerTracePrefixBuilder(properties) + "Unscheduled
Spec");
} catch (SpecNotFoundException e) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 674ff0024..cda69ebd9 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -28,7 +28,6 @@ import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
@@ -65,13 +64,10 @@ public final class FlowCompilationValidationHelper {
* flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
* caller.
* @param flowSpec
- * @param optionalFlowExecutionId for scheduled (non-ad-hoc) flows, to pass
the ID "laundered" via the DB;
- * see: {@link
org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter javadoc section
titled
- * `Database event_timestamp laundering`}
* @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
*/
- public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec
flowSpec,
- Optional<String> optionalFlowExecutionId) throws IOException,
InterruptedException {
+ public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec
flowSpec)
+ throws IOException, InterruptedException {
Config flowConfig = flowSpec.getConfig();
String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
@@ -93,7 +89,7 @@ public final class FlowCompilationValidationHelper {
return Optional.absent();
}
- addFlowExecutionIdIfAbsent(flowMetadata, optionalFlowExecutionId,
jobExecutionPlanDagOptional.get());
+ addFlowExecutionIdIfAbsent(flowMetadata,
jobExecutionPlanDagOptional.get());
flowCompilationTimer.stop(flowMetadata);
return jobExecutionPlanDagOptional;
}
@@ -101,25 +97,25 @@ public final class FlowCompilationValidationHelper {
/**
* Checks if flowSpec disallows concurrent executions, and if so then checks
if another instance of the flow is
* already running and emits a FLOW FAILED event. Otherwise, this check
passes.
- * @return Optional<Dag<JobExecutionPlan>> if caller allowed to execute flow
and compile spec, else absent Optional
+ * @return Optional<Dag<JobExecutionPlan>> if caller allowed to execute flow
and compile flowSpec, else Optional.absent()
* @throws IOException
*/
- public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Config flowConfig, Spec spec,
+ public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Config flowConfig, FlowSpec flowSpec,
String flowGroup, String flowName) throws IOException {
boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
isFlowConcurrencyEnabled);
- Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup,
allowConcurrentExecution)) {
return Optional.fromNullable(jobExecutionPlanDag);
} else {
log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ "concurrent executions are disabled for this flow.", flowGroup,
flowName);
- sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
+
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(flowSpec,
SharedFlowMetricsSingleton.CompiledState.SKIPPED);
Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter());
- if (!isScheduledFlow((FlowSpec) spec)) {
+ if (!flowSpec.isScheduled()) {
// For ad-hoc flow, we might already increase quota, we need to
decrease here
for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
quotaManager.releaseQuota(dagNode);
@@ -127,9 +123,9 @@ public final class FlowCompilationValidationHelper {
}
// Send FLOW_FAILED event
- Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because
another instance is running and concurrent "
- + "executions are disabled. Set flow.allowConcurrentExecution to
true in the flow spec to change this behaviour.");
+ + "executions are disabled. Set flow.allowConcurrentExecution to
true in the flowSpec to change this behaviour.");
new TimingEvent(eventSubmitter,
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
return Optional.absent();
}
@@ -150,11 +146,11 @@ public final class FlowCompilationValidationHelper {
/**
* Abstraction used to populate the message of and emit a FlowCompileFailed
event for the Orchestrator.
- * @param spec
+ * @param flowSpec
* @param flowMetadata
*/
- public static void populateFlowCompilationFailedEventMessage(EventSubmitter
eventSubmitter, Spec spec,
- Map<String, String> flowMetadata) {
+ public static void populateFlowCompilationFailedEventMessage(EventSubmitter
eventSubmitter,
+ FlowSpec flowSpec, Map<String, String> flowMetadata) {
// For scheduled flows, we do not insert the flowExecutionId into the
FlowSpec. As a result, if the flow
// compilation fails (i.e. we are unable to find a path), the metadata
will not have flowExecutionId.
// In this case, the current time is used as the flow executionId.
@@ -162,8 +158,8 @@ public final class FlowCompilationValidationHelper {
Long.toString(System.currentTimeMillis()));
String message = "Flow was not compiled successfully.";
- if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
- message = message + " Compilation errors encountered: " + ((FlowSpec)
spec).getCompilationErrors();
+ if (!flowSpec.getCompilationErrors().isEmpty()) {
+ message = message + " Compilation errors encountered: " +
flowSpec.getCompilationErrors();
}
flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
@@ -171,32 +167,13 @@ public final class FlowCompilationValidationHelper {
}
/**
- * If it is a scheduled flow (which does not have flowExecutionId in the
FlowSpec) and the flow compilation is
- * successful, retrieve flowExecutionId from the JobSpec.
+ * If it is a scheduled flow run without multi-active scheduler
configuration (where the FlowSpec does not have a
+ * flowExecutionId) and the flow compilation is successful, retrieve
flowExecutionId from the JobSpec.
*/
public static void addFlowExecutionIdIfAbsent(Map<String,String>
flowMetadata,
Dag<JobExecutionPlan> jobExecutionPlanDag) {
- addFlowExecutionIdIfAbsent(flowMetadata, Optional.absent(),
jobExecutionPlanDag);
- }
-
- /**
- * If it is a scheduled flow (which does not have flowExecutionId in the
FlowSpec) and the flow compilation is
- * successful, add a flowExecutionId using the optional parameter if it
exists otherwise retrieve it from the JobSpec.
- */
- public static void addFlowExecutionIdIfAbsent(Map<String,String>
flowMetadata,
- Optional<String> optionalFlowExecutionId, Dag<JobExecutionPlan>
jobExecutionPlanDag) {
- if (optionalFlowExecutionId.isPresent()) {
-
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
optionalFlowExecutionId.get());
- }
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(
ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
}
-
- /**
- * Return true if the spec contains a schedule, false otherwise.
- */
- public static boolean isScheduledFlow(FlowSpec spec) {
- return
spec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY);
- }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 647bb8089..a6f11e08f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -34,6 +34,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
@@ -275,9 +276,13 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
FlowSpec spec = null;
try {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
- spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
- // Pass flowExecutionId to DagManager to be used for scheduled flows
that do not already contain a flowExecutionId
- this.orchestrator.submitFlowToDagManager(spec, dagAction);
+ spec = flowCatalog.getSpecs(flowUri);
+ /* Update the spec to contain the flowExecutionId from the dagAction for
scheduled flows that do not already
+ contain a flowExecutionId. Adhoc flowSpecs are already consistent with
the dagAction so there's no effective
+ change. It's crucial to adopt the consensus flowExecutionId here to
prevent creating a new one during compilation.
+ */
+ spec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
+ this.orchestrator.submitFlowToDagManager(spec);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {}. Exception {}",
flowId, e.getMessage());
launchSubmissionMetricProxy.markFailure();
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java
index 5f778f3f8..37ab711f0 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.service.modules.utils;
-import com.google.common.base.Optional;
import java.net.URISyntaxException;
import java.util.HashMap;
import org.apache.gobblin.metrics.event.TimingEvent;
@@ -35,7 +34,6 @@ import org.testng.annotations.Test;
public class FlowCompilationValidationHelperTest {
private String dagId = "testDag";
private Long jobSpecFlowExecutionId = 1234L;
- private String newFlowExecutionId = "5678";
private String existingFlowExecutionId = "9999";
private Dag<JobExecutionPlan> jobExecutionPlanDag;
@@ -46,14 +44,13 @@ public class FlowCompilationValidationHelperTest {
}
/*
- Tests that addFlowExecutionIdIfAbsent adds flowExecutionId to a
flowMetadata object when it is absent, prioritizing
- the optional flowExecutionId over the one from the job spec
+ Tests that addFlowExecutionIdIfAbsent adds the jobSpec flowExecutionId to
a flowMetadata object when it is absent
*/
@Test
public void testAddFlowExecutionIdWhenAbsent() {
HashMap<String, String> flowMetadata = new HashMap<>();
- FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
Optional.of(newFlowExecutionId), jobExecutionPlanDag);
-
Assert.assertEquals(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD),
newFlowExecutionId);
+ FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
jobExecutionPlanDag);
+
Assert.assertEquals(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD),
String.valueOf(jobSpecFlowExecutionId));
}
/*
@@ -63,7 +60,7 @@ public class FlowCompilationValidationHelperTest {
public void testSkipAddingFlowExecutionIdWhenPresent() {
HashMap<String, String> flowMetadata = new HashMap<>();
flowMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
existingFlowExecutionId);
- FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
Optional.of(newFlowExecutionId), jobExecutionPlanDag);
+
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,jobExecutionPlanDag);
Assert.assertEquals(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD),
existingFlowExecutionId);
}
}