This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0d3b3b544 Add flow execution id to flow metadata for multi-active case
(#3733)
0d3b3b544 is described below
commit 0d3b3b544efac7647094a131f9c8947351d40511
Author: umustafi <[email protected]>
AuthorDate: Tue Aug 8 04:58:03 2023 -0700
Add flow execution id to flow metadata for multi-active case (#3733)
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../gobblin/service/modules/orchestration/FlowTriggerHandler.java | 2 ++
.../org/apache/gobblin/service/modules/orchestration/Orchestrator.java | 1 +
2 files changed, 3 insertions(+)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 90379e730..410350b88 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -104,6 +104,8 @@ public class FlowTriggerHandler {
public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
if (multiActiveLeaseArbiter.isPresent()) {
+ log.info("Multi-active scheduler about to handle trigger event: [{},
triggerEventTimestamp: {}]", flowAction,
+ eventTimeMillis);
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 186b6d81c..c0dc8b209 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -244,6 +244,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
return;
}
Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+ FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
jobExecutionPlanDagOptional.get());
// If multi-active scheduler is enabled do not pass onto DagManager,
otherwise scheduler forwards it directly
// Skip flow compilation as well, since we recompile after receiving
event from DagActionStoreChangeMonitor later