This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a2924600b Delete Launch Action Events After Processing (#3837)
a2924600b is described below
commit a2924600b341e3fbe4e0c16ba07a7beed8345dae
Author: umustafi <[email protected]>
AuthorDate: Wed Nov 29 12:05:09 2023 -0800
Delete Launch Action Events After Processing (#3837)
* Delete launch action event after persisting
* Fix default value for flowExecutionId retrieval from metadata map
* Address review comments and add unit test
* Code clean up
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../runtime/DagActionStoreChangeMonitorTest.java | 5 ++---
.../service/modules/orchestration/DagManager.java | 20 ++++++++++++++++----
.../modules/orchestration/Orchestrator.java | 22 +++++++++++++++-------
.../modules/orchestration/TimingEventUtils.java | 8 ++++++++
.../monitoring/DagActionStoreChangeMonitor.java | 15 ++++++---------
.../DagActionStoreChangeMonitorFactory.java | 6 ++----
.../modules/orchestration/DagManagerFlowTest.java | 3 +++
7 files changed, 52 insertions(+), 27 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 4025cf0db..dd17e2d21 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
-import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.orchestration.DagManager;
@@ -66,8 +65,8 @@ public class DagActionStoreChangeMonitorTest {
public MockDagActionStoreChangeMonitor(String topic, Config config, int
numThreads,
boolean isMultiActiveSchedulerEnabled) {
- super(topic, config, mock(DagActionStore.class), mock(DagManager.class),
numThreads, mock(FlowCatalog.class),
- mock(Orchestrator.class), isMultiActiveSchedulerEnabled);
+ super(topic, config, mock(DagManager.class), numThreads,
mock(FlowCatalog.class), mock(Orchestrator.class),
+ isMultiActiveSchedulerEnabled);
}
protected void processMessageForTest(DecodeableKafkaRecord record) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index c90841b50..ec8376584 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.orchestration;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -185,6 +186,10 @@ public class DagManager extends AbstractIdleService {
public String toString() {
return Joiner.on("_").join(flowGroup, flowName, flowExecutionId);
}
+
+ DagActionStore.DagAction toDagAction(DagActionStore.FlowActionType
actionType) {
+ return new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, actionType);
+ }
}
private final BlockingQueue<Dag<JobExecutionPlan>>[] runQueue;
@@ -218,7 +223,9 @@ public class DagManager extends AbstractIdleService {
private final long failedDagRetentionTime;
private final DagManagerMetrics dagManagerMetrics;
+ @Getter
@Inject(optional=true)
+ @VisibleForTesting
protected Optional<DagActionStore> dagActionStore;
private volatile boolean isActive = false;
@@ -312,9 +319,16 @@ public class DagManager extends AbstractIdleService {
log.warn("Skipping add dag because this instance of DagManager is not
active for dag: {}", dag);
return;
}
+
+ DagId dagId = DagManagerUtils.generateDagId(dag);
if (persist) {
- //Persist the dag
+ // Persist the dag
this.dagStateStore.writeCheckpoint(dag);
+ // After persisting the dag, its status will be tracked by active
dagManagers so the action should be deleted
+ // to avoid duplicate executions upon leadership change
+ if (this.dagActionStore.isPresent()) {
+
this.dagActionStore.get().deleteDagAction(dagId.toDagAction(DagActionStore.FlowActionType.LAUNCH));
+ }
}
int queueId = DagManagerUtils.getDagQueueId(dag, this.numThreads);
// Add the dag to the specific queue determined by flowExecutionId
@@ -322,7 +336,7 @@ public class DagManager extends AbstractIdleService {
// flow create request was forwarded. This is because Azkaban Exec Id is
stored in the DagNode of the
// specific DagManagerThread queue
if (!this.runQueue[queueId].offer(dag)) {
- throw new IOException("Could not add dag" +
DagManagerUtils.generateDagId(dag) + "to queue");
+ throw new IOException("Could not add dag" + dagId + "to queue");
}
if (setStatus) {
submitEventsAndSetStatus(dag);
@@ -511,8 +525,6 @@ public class DagManager extends AbstractIdleService {
log.warn("Failed flow compilation of spec causing launch flow event to
be skipped on startup. Flow {}", flowId);
this.dagManagerMetrics.incrementFailedLaunchCount();
}
- // Upon handling the action, delete it so on leadership change this is
not duplicated
- this.dagActionStore.get().deleteDagAction(launchAction);
} catch (URISyntaxException e) {
log.warn(String.format("Could not create URI object for flowId %s due to
exception", flowId), e);
this.dagManagerMetrics.incrementFailedLaunchCount();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 0461bb11f..023f87117 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -250,6 +250,16 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
}
Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
jobExecutionPlanDagOptional.get());
+ java.util.Optional<String> flowExecutionId =
TimingEventUtils.getFlowExecutionIdFromFlowMetadata(flowMetadata);
+
+ // Unexpected result because flowExecutionId should be provided by above
call too 'addFlowExecutionIdIfAbsent'
+ if (!flowExecutionId.isPresent()) {
+ _log.warn("FlowMetadata does not contain flowExecutionId when it
should have been provided. Skipping execution "
+ + "of: {}", spec);
+ return;
+ }
+ DagActionStore.DagAction flowAction =
+ new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId.get(), DagActionStore.FlowActionType.LAUNCH);
// If multi-active scheduler is enabled do not pass onto DagManager,
otherwise scheduler forwards it directly
// Skip flow compilation as well, since we recompile after receiving
event from DagActionStoreChangeMonitor later
@@ -266,9 +276,6 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
return;
}
- String flowExecutionId =
flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
- DagActionStore.DagAction flowAction =
- new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH);
flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction,
triggerTimestampMillis, isReminderEvent);
_log.info("Multi-active scheduler finished handling trigger event:
[{}, is: {}, triggerEventTimestamp: {}]",
flowAction, isReminderEvent ? "reminder" : "original",
triggerTimestampMillis);
@@ -306,7 +313,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
Spec jobSpec = jobExecutionPlan.getJobSpec();
if (!((JobSpec)
jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
- _log.warn("JobSpec does not contain flowExecutionId.");
+ _log.warn("JobSpec does not contain flowExecutionId: {}",
jobSpec);
}
Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
@@ -335,9 +342,10 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
}
- public void submitFlowToDagManager(FlowSpec flowSpec, Optional<String>
optionalFlowExecutionId) throws IOException, InterruptedException {
+ public void submitFlowToDagManager(FlowSpec flowSpec,
DagActionStore.DagAction flowAction) throws IOException, InterruptedException {
Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
-
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
optionalFlowExecutionId);
+
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
+ Optional.of(flowAction.getFlowExecutionId()));
if (optionalJobExecutionPlanDag.isPresent()) {
submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
} else {
@@ -349,7 +357,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan>
jobExecutionPlanDag)
throws IOException {
try {
- //Send the dag to the DagManager.
+ // Send the dag to the DagManager
this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
} catch (Exception ex) {
String failureMessage = "Failed to add Job Execution Plan due to: " +
ex.getMessage();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index 99661305f..7947ed0ae 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -21,6 +21,7 @@ import java.util.Map;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
+import java.util.Optional;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
@@ -47,6 +48,13 @@ public class TimingEventUtils {
return metadata;
}
+ /**
+ * Retrieves a flowExecutionId from flowMetadata map if one exists otherwise
an empty Optional.
+ */
+ public static Optional<String>
getFlowExecutionIdFromFlowMetadata(Map<String, String> flowMetadata) {
+ return
Optional.ofNullable(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
+ }
+
static Map<String, String> getJobMetadata(Map<String, String> flowMetadata,
JobExecutionPlan jobExecutionPlan) {
Map<String, String> jobMetadata = Maps.newHashMap();
JobSpec jobSpec = jobExecutionPlan.getJobSpec();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index e9497c526..573faff01 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -18,7 +18,6 @@
package org.apache.gobblin.service.monitoring;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@@ -78,7 +77,6 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
protected LoadingCache<String, String>
dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10,
TimeUnit.MINUTES).build(cacheLoader);
- protected DagActionStore dagActionStore;
@Getter
@VisibleForTesting
protected DagManager dagManager;
@@ -90,13 +88,12 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
// Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
// client itself to determine all Kafka related information dynamically
rather than through the config.
- public DagActionStoreChangeMonitor(String topic, Config config,
DagActionStore dagActionStore, DagManager dagManager,
- int numThreads, FlowCatalog flowCatalog, Orchestrator orchestrator,
boolean isMultiActiveSchedulerEnabled) {
+ public DagActionStoreChangeMonitor(String topic, Config config, DagManager
dagManager, int numThreads,
+ FlowCatalog flowCatalog, Orchestrator orchestrator, boolean
isMultiActiveSchedulerEnabled) {
// Differentiate group id for each host
super(topic, config.withValue(GROUP_ID_KEY,
ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX +
UUID.randomUUID().toString())),
numThreads);
- this.dagActionStore = dagActionStore;
this.dagManager = dagManager;
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
@@ -168,7 +165,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
throw new RuntimeException(String.format("Received LAUNCH
dagAction while not in multi-active scheduler "
+ "mode for flowAction: %s", dagAction));
}
- submitFlowToDagManagerHelper(flowGroup, flowName, flowExecutionId);
+ submitFlowToDagManagerHelper(dagAction);
} else {
log.warn("Received unsupported dagAction {}. Expected to be a KILL,
RESUME, or LAUNCH", dagActionType);
this.unexpectedErrors.mark();
@@ -195,15 +192,15 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
}
- protected void submitFlowToDagManagerHelper(String flowGroup, String
flowName, String flowExecutionId) {
+ protected void submitFlowToDagManagerHelper(DagActionStore.DagAction
dagAction) {
// Retrieve job execution plan by recompiling the flow spec to send to the
DagManager
- FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+ FlowId flowId = new
FlowId().setFlowGroup(dagAction.getFlowGroup()).setFlowName(dagAction.getFlowName());
FlowSpec spec = null;
try {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
// Pass flowExecutionId to DagManager to be used for scheduled flows
that do not already contain a flowExecutionId
- this.orchestrator.submitFlowToDagManager(spec,
Optional.of(flowExecutionId));
+ this.orchestrator.submitFlowToDagManager(spec, dagAction);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {}. Exception {}",
flowId, e.getMessage());
this.failedFlowLaunchSubmissions.mark();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
index 5806949a8..1dba94bae 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -42,7 +42,6 @@ public class DagActionStoreChangeMonitorFactory implements
Provider<DagActionSto
static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY =
"numThreads";
private final Config config;
- private DagActionStore dagActionStore;
private DagManager dagManager;
private FlowCatalog flowCatalog;
private Orchestrator orchestrator;
@@ -53,7 +52,6 @@ public class DagActionStoreChangeMonitorFactory implements
Provider<DagActionSto
FlowCatalog flowCatalog, Orchestrator orchestrator,
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean
isMultiActiveSchedulerEnabled) {
this.config = Objects.requireNonNull(config);
- this.dagActionStore = dagActionStore;
this.dagManager = dagManager;
this.flowCatalog = flowCatalog;
this.orchestrator = orchestrator;
@@ -68,8 +66,8 @@ public class DagActionStoreChangeMonitorFactory implements
Provider<DagActionSto
String topic = ""; // Pass empty string because we expect underlying
client to dynamically determine the Kafka topic
int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig,
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
- return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig,
this.dagActionStore, this.dagManager,
- numThreads, flowCatalog, orchestrator, isMultiActiveSchedulerEnabled);
+ return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig,
this.dagManager, numThreads, flowCatalog,
+ orchestrator, isMultiActiveSchedulerEnabled);
}
@Override
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 5445a2096..e756a3e61 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -130,7 +130,10 @@ public class DagManagerFlowTest {
.thenReturn(Collections.singletonList(flowExecutionId3));
// mock add spec
+ // for very first dag to be added, add dag action to store and check its
deleted by the addDag call
+ dagManager.getDagActionStore().get().addDagAction("group0", "flow0",
Long.toString(flowExecutionId1), DagActionStore.FlowActionType.LAUNCH);
dagManager.addDag(dag1, true, true);
+ Assert.assertFalse(dagManager.getDagActionStore().get().exists("group0",
"flow0", Long.toString(flowExecutionId1),
DagActionStore.FlowActionType.LAUNCH));
dagManager.addDag(dag2, true, true);
dagManager.addDag(dag3, true, true);