This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a2924600b Delete Launch Action Events After Processing (#3837)
a2924600b is described below

commit a2924600b341e3fbe4e0c16ba07a7beed8345dae
Author: umustafi <[email protected]>
AuthorDate: Wed Nov 29 12:05:09 2023 -0800

    Delete Launch Action Events After Processing (#3837)
    
    * Delete launch action event after persisting
    
    * Fix default value for flowExecutionId retrieval from metadata map
    
    * Address review comments and add unit test
    
    * Code clean up
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../runtime/DagActionStoreChangeMonitorTest.java   |  5 ++---
 .../service/modules/orchestration/DagManager.java  | 20 ++++++++++++++++----
 .../modules/orchestration/Orchestrator.java        | 22 +++++++++++++++-------
 .../modules/orchestration/TimingEventUtils.java    |  8 ++++++++
 .../monitoring/DagActionStoreChangeMonitor.java    | 15 ++++++---------
 .../DagActionStoreChangeMonitorFactory.java        |  6 ++----
 .../modules/orchestration/DagManagerFlowTest.java  |  3 +++
 7 files changed, 52 insertions(+), 27 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 4025cf0db..dd17e2d21 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
-import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
@@ -66,8 +65,8 @@ public class DagActionStoreChangeMonitorTest {
 
     public MockDagActionStoreChangeMonitor(String topic, Config config, int 
numThreads,
         boolean isMultiActiveSchedulerEnabled) {
-      super(topic, config, mock(DagActionStore.class), mock(DagManager.class), 
numThreads, mock(FlowCatalog.class),
-          mock(Orchestrator.class), isMultiActiveSchedulerEnabled);
+      super(topic, config, mock(DagManager.class), numThreads, 
mock(FlowCatalog.class), mock(Orchestrator.class),
+          isMultiActiveSchedulerEnabled);
     }
 
     protected void processMessageForTest(DecodeableKafkaRecord record) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index c90841b50..ec8376584 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.orchestration;
 
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -185,6 +186,10 @@ public class DagManager extends AbstractIdleService {
     public String toString() {
       return Joiner.on("_").join(flowGroup, flowName, flowExecutionId);
     }
+
+    DagActionStore.DagAction toDagAction(DagActionStore.FlowActionType 
actionType) {
+      return new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId, actionType);
+    }
   }
 
   private final BlockingQueue<Dag<JobExecutionPlan>>[] runQueue;
@@ -218,7 +223,9 @@ public class DagManager extends AbstractIdleService {
   private final long failedDagRetentionTime;
   private final DagManagerMetrics dagManagerMetrics;
 
+  @Getter
   @Inject(optional=true)
+  @VisibleForTesting
   protected Optional<DagActionStore> dagActionStore;
 
   private volatile boolean isActive = false;
@@ -312,9 +319,16 @@ public class DagManager extends AbstractIdleService {
       log.warn("Skipping add dag because this instance of DagManager is not 
active for dag: {}", dag);
       return;
     }
+
+    DagId dagId = DagManagerUtils.generateDagId(dag);
     if (persist) {
-      //Persist the dag
+      // Persist the dag
       this.dagStateStore.writeCheckpoint(dag);
+      // After persisting the dag, its status will be tracked by active 
dagManagers so the action should be deleted
+      // to avoid duplicate executions upon leadership change
+      if (this.dagActionStore.isPresent()) {
+        
this.dagActionStore.get().deleteDagAction(dagId.toDagAction(DagActionStore.FlowActionType.LAUNCH));
+      }
     }
     int queueId = DagManagerUtils.getDagQueueId(dag, this.numThreads);
     // Add the dag to the specific queue determined by flowExecutionId
@@ -322,7 +336,7 @@ public class DagManager extends AbstractIdleService {
     // flow create request was forwarded. This is because Azkaban Exec Id is 
stored in the DagNode of the
     // specific DagManagerThread queue
     if (!this.runQueue[queueId].offer(dag)) {
-      throw new IOException("Could not add dag" + 
DagManagerUtils.generateDagId(dag) + "to queue");
+      throw new IOException("Could not add dag" + dagId + "to queue");
     }
     if (setStatus) {
       submitEventsAndSetStatus(dag);
@@ -511,8 +525,6 @@ public class DagManager extends AbstractIdleService {
         log.warn("Failed flow compilation of spec causing launch flow event to 
be skipped on startup. Flow {}", flowId);
         this.dagManagerMetrics.incrementFailedLaunchCount();
       }
-      // Upon handling the action, delete it so on leadership change this is 
not duplicated
-      this.dagActionStore.get().deleteDagAction(launchAction);
     } catch (URISyntaxException e) {
       log.warn(String.format("Could not create URI object for flowId %s due to 
exception", flowId), e);
       this.dagManagerMetrics.incrementFailedLaunchCount();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 0461bb11f..023f87117 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -250,6 +250,16 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       }
       Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
       FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
jobExecutionPlanDagOptional.get());
+      java.util.Optional<String> flowExecutionId = 
TimingEventUtils.getFlowExecutionIdFromFlowMetadata(flowMetadata);
+
+      // Unexpected result because flowExecutionId should be provided by above 
call too 'addFlowExecutionIdIfAbsent'
+      if (!flowExecutionId.isPresent()) {
+        _log.warn("FlowMetadata does not contain flowExecutionId when it 
should have been provided. Skipping execution "
+            + "of: {}", spec);
+        return;
+      }
+      DagActionStore.DagAction flowAction =
+          new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId.get(), DagActionStore.FlowActionType.LAUNCH);
 
       // If multi-active scheduler is enabled do not pass onto DagManager, 
otherwise scheduler forwards it directly
       // Skip flow compilation as well, since we recompile after receiving 
event from DagActionStoreChangeMonitor later
@@ -266,9 +276,6 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
           return;
         }
 
-        String flowExecutionId = 
flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
-        DagActionStore.DagAction flowAction =
-            new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH);
         flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, 
triggerTimestampMillis, isReminderEvent);
         _log.info("Multi-active scheduler finished handling trigger event: 
[{}, is: {}, triggerEventTimestamp: {}]",
             flowAction, isReminderEvent ? "reminder" : "original", 
triggerTimestampMillis);
@@ -306,7 +313,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
               Spec jobSpec = jobExecutionPlan.getJobSpec();
 
               if (!((JobSpec) 
jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
-                _log.warn("JobSpec does not contain flowExecutionId.");
+                _log.warn("JobSpec does not contain flowExecutionId: {}", 
jobSpec);
               }
 
               Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
@@ -335,9 +342,10 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
-  public void submitFlowToDagManager(FlowSpec flowSpec, Optional<String> 
optionalFlowExecutionId) throws IOException, InterruptedException {
+  public void submitFlowToDagManager(FlowSpec flowSpec, 
DagActionStore.DagAction flowAction) throws IOException, InterruptedException {
     Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
-        
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec, 
optionalFlowExecutionId);
+        
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec,
+            Optional.of(flowAction.getFlowExecutionId()));
     if (optionalJobExecutionPlanDag.isPresent()) {
       submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
     } else {
@@ -349,7 +357,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> 
jobExecutionPlanDag)
       throws IOException {
     try {
-      //Send the dag to the DagManager.
+      // Send the dag to the DagManager
       this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
     } catch (Exception ex) {
       String failureMessage = "Failed to add Job Execution Plan due to: " + 
ex.getMessage();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index 99661305f..7947ed0ae 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
+import java.util.Optional;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.FlowSpec;
@@ -47,6 +48,13 @@ public class TimingEventUtils {
     return metadata;
   }
 
+  /**
+   * Retrieves a flowExecutionId from flowMetadata map if one exists otherwise 
an empty Optional.
+   */
+  public static Optional<String> 
getFlowExecutionIdFromFlowMetadata(Map<String, String> flowMetadata) {
+    return 
Optional.ofNullable(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
+  }
+
   static Map<String, String> getJobMetadata(Map<String, String> flowMetadata, 
JobExecutionPlan jobExecutionPlan) {
     Map<String, String> jobMetadata = Maps.newHashMap();
     JobSpec jobSpec = jobExecutionPlan.getJobSpec();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index e9497c526..573faff01 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -18,7 +18,6 @@
 package org.apache.gobblin.service.monitoring;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -78,7 +77,6 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   protected LoadingCache<String, String>
       dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10, 
TimeUnit.MINUTES).build(cacheLoader);
 
-  protected DagActionStore dagActionStore;
   @Getter
   @VisibleForTesting
   protected DagManager dagManager;
@@ -90,13 +88,12 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
 
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
-  public DagActionStoreChangeMonitor(String topic, Config config, 
DagActionStore dagActionStore, DagManager dagManager,
-      int numThreads, FlowCatalog flowCatalog, Orchestrator orchestrator, 
boolean isMultiActiveSchedulerEnabled) {
+  public DagActionStoreChangeMonitor(String topic, Config config, DagManager 
dagManager, int numThreads,
+      FlowCatalog flowCatalog, Orchestrator orchestrator, boolean 
isMultiActiveSchedulerEnabled) {
     // Differentiate group id for each host
     super(topic, config.withValue(GROUP_ID_KEY,
         ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + 
UUID.randomUUID().toString())),
         numThreads);
-    this.dagActionStore = dagActionStore;
     this.dagManager = dagManager;
     this.flowCatalog = flowCatalog;
     this.orchestrator = orchestrator;
@@ -168,7 +165,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
             throw new RuntimeException(String.format("Received LAUNCH 
dagAction while not in multi-active scheduler "
                 + "mode for flowAction: %s", dagAction));
           }
-          submitFlowToDagManagerHelper(flowGroup, flowName, flowExecutionId);
+          submitFlowToDagManagerHelper(dagAction);
         } else {
           log.warn("Received unsupported dagAction {}. Expected to be a KILL, 
RESUME, or LAUNCH", dagActionType);
           this.unexpectedErrors.mark();
@@ -195,15 +192,15 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
   }
 
-  protected void submitFlowToDagManagerHelper(String flowGroup, String 
flowName, String flowExecutionId) {
+  protected void submitFlowToDagManagerHelper(DagActionStore.DagAction 
dagAction) {
     // Retrieve job execution plan by recompiling the flow spec to send to the 
DagManager
-    FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+    FlowId flowId = new 
FlowId().setFlowGroup(dagAction.getFlowGroup()).setFlowName(dagAction.getFlowName());
     FlowSpec spec = null;
     try {
       URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
       spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
       // Pass flowExecutionId to DagManager to be used for scheduled flows 
that do not already contain a flowExecutionId
-      this.orchestrator.submitFlowToDagManager(spec, 
Optional.of(flowExecutionId));
+      this.orchestrator.submitFlowToDagManager(spec, dagAction);
     } catch (URISyntaxException e) {
       log.warn("Could not create URI object for flowId {}. Exception {}", 
flowId, e.getMessage());
       this.failedFlowLaunchSubmissions.mark();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
index 5806949a8..1dba94bae 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -42,7 +42,6 @@ public class DagActionStoreChangeMonitorFactory implements 
Provider<DagActionSto
   static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = 
"numThreads";
 
   private final Config config;
-  private DagActionStore dagActionStore;
   private DagManager dagManager;
   private FlowCatalog flowCatalog;
   private Orchestrator orchestrator;
@@ -53,7 +52,6 @@ public class DagActionStoreChangeMonitorFactory implements 
Provider<DagActionSto
       FlowCatalog flowCatalog, Orchestrator orchestrator,
       @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean 
isMultiActiveSchedulerEnabled) {
     this.config = Objects.requireNonNull(config);
-    this.dagActionStore = dagActionStore;
     this.dagManager = dagManager;
     this.flowCatalog = flowCatalog;
     this.orchestrator = orchestrator;
@@ -68,8 +66,8 @@ public class DagActionStoreChangeMonitorFactory implements 
Provider<DagActionSto
     String topic = ""; // Pass empty string because we expect underlying 
client to dynamically determine the Kafka topic
     int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig, 
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
 
-    return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig, 
this.dagActionStore, this.dagManager,
-        numThreads, flowCatalog, orchestrator, isMultiActiveSchedulerEnabled);
+    return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig, 
this.dagManager, numThreads, flowCatalog,
+        orchestrator, isMultiActiveSchedulerEnabled);
   }
 
   @Override
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 5445a2096..e756a3e61 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -130,7 +130,10 @@ public class DagManagerFlowTest {
         .thenReturn(Collections.singletonList(flowExecutionId3));
 
     // mock add spec
+    // for very first dag to be added, add dag action to store and check its 
deleted by the addDag call
+    dagManager.getDagActionStore().get().addDagAction("group0", "flow0", 
Long.toString(flowExecutionId1), DagActionStore.FlowActionType.LAUNCH);
     dagManager.addDag(dag1, true, true);
+    Assert.assertFalse(dagManager.getDagActionStore().get().exists("group0", 
"flow0", Long.toString(flowExecutionId1), 
DagActionStore.FlowActionType.LAUNCH));
     dagManager.addDag(dag2, true, true);
     dagManager.addDag(dag3, true, true);
 

Reply via email to