This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d3b3b544 Add flow execution id to flow metadata for multi-active case 
(#3733)
0d3b3b544 is described below

commit 0d3b3b544efac7647094a131f9c8947351d40511
Author: umustafi <[email protected]>
AuthorDate: Tue Aug 8 04:58:03 2023 -0700

    Add flow execution id to flow metadata for multi-active case (#3733)
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../gobblin/service/modules/orchestration/FlowTriggerHandler.java       | 2 ++
 .../org/apache/gobblin/service/modules/orchestration/Orchestrator.java  | 1 +
 2 files changed, 3 insertions(+)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 90379e730..410350b88 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -104,6 +104,8 @@ public class FlowTriggerHandler {
   public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
     if (multiActiveLeaseArbiter.isPresent()) {
+      log.info("Multi-active scheduler about to handle trigger event: [{}, 
triggerEventTimestamp: {}]", flowAction,
+          eventTimeMillis);
       MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
       if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
         MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = 
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 186b6d81c..c0dc8b209 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -244,6 +244,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         return;
       }
       Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+      FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
jobExecutionPlanDagOptional.get());
 
       // If multi-active scheduler is enabled do not pass onto DagManager, 
otherwise scheduler forwards it directly
       // Skip flow compilation as well, since we recompile after receiving 
event from DagActionStoreChangeMonitor later

Reply via email to