phet commented on code in PR #3899:
URL: https://github.com/apache/gobblin/pull/3899#discussion_r1540581956


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionExecutionMultiActiveLeaseArbiterFactory.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
+
+/**
+ * A factory implementation that returns a {@link MultiActiveLeaseArbiter} 
instance used by the
+ * {@link DagManagementTaskStreamImpl} in multi-active execution mode
+ */
+@Slf4j
+public class DagActionExecutionMultiActiveLeaseArbiterFactory extends 
MultiActiveLeaseArbiterFactory {

Review Comment:
   nit: might align better w/ other class naming as 
`DagActionProcessingMultiActiveLeaseArbiterFactory`



##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -100,13 +100,13 @@ public class ConfigurationKeys {
   public static final String 
MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY = 
MYSQL_DAG_ACTION_STORE_PREFIX + "retentionPeriodSeconds";
   public static final long 
DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 * 
60; // (3 days in seconds)
   // Scheduler lease determination store configuration
+  public static final String SCHEDULER_LEASE_ARBITER_NAME = 
"SchedulerFlowLaunchLeaseArbiter";
+  public static final String EXECUTOR_LEASE_ARBITER_NAME = 
"DagActionExecutorLeaseArbiter";

Review Comment:
   if you do rename to `DagActionProcessingMALA`, this too would become 
`PROCESSING_LEASE_...`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java:
##########
@@ -202,21 +203,28 @@ public void configure(Binder binder) {
     OptionalBinder.newOptionalBinder(binder, DagManagementStateStore.class);
     OptionalBinder.newOptionalBinder(binder, DagProcFactory.class);
     OptionalBinder.newOptionalBinder(binder, DagProcessingEngine.class);
+    OptionalBinder.newOptionalBinder(binder, DagActionReminderScheduler.class);
     if (serviceConfig.isDagProcessingEngineEnabled()) {
-      binder.bind(DagManagement.class).to(DagManagementTaskStreamImpl.class);
-      binder.bind(DagTaskStream.class).to(DagManagementTaskStreamImpl.class);
-      
binder.bind(DagManagementStateStore.class).to(MostlyMySqlDagManagementStateStore.class).in(Singleton.class);
-      binder.bind(DagProcFactory.class);
-      binder.bind(DagProcessingEngine.class);
-
+      /* The only way to differentiate two instances of the same class is with 
an `annotatedWith` marker provided at

Review Comment:
   excellent comment, BTW - in 10 seconds I learned what might have otherwise 
cost me 30 mins! ;)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -53,20 +74,30 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
 
   @Inject(optional=true)
   protected Optional<DagActionStore> dagActionStore;
-  protected Optional<ReminderSettingDagProcLeaseArbiter> 
reminderSettingDagProcLeaseArbiter;
+  protected MultiActiveLeaseArbiter dagActionExecutionLeaseArbiter;

Review Comment:
   reminder to rename if we change `s/Execution/Processing/`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java:
##########
@@ -58,28 +57,32 @@
  * An implementation for {@link DagProc} that launches a new job.
  */
 @Slf4j
-@RequiredArgsConstructor
 public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, 
Optional<Dag<JobExecutionPlan>>> {
-  private final LaunchDagTask launchDagTask;
   private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
   // todo - this is not orchestration delay and should be renamed. keeping it 
the same because DagManager is also using
   // the same name
   private static final AtomicLong orchestrationDelayCounter = new 
AtomicLong(0);
+
+  public LaunchDagProc(DagTask dagTask, FlowCompilationValidationHelper 
flowCompilationValidationHelper) {
+    super(dagTask);
+    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
+  }
+
   static {
     metricContext.register(
         
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get));
   }
 
   @Override
   protected DagManager.DagId getDagId() {
-    return this.launchDagTask.getDagId();
+    return this.getDagTask().getDagId();
   }

Review Comment:
   why not implement in the base class?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java:
##########
@@ -76,8 +78,11 @@ public void setUp() throws Exception {
     topologySpecMap.put(specExecURI, topologySpec);
     MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null, null);
     dagManagementStateStore.setTopologySpecMap(topologySpecMap);
+    // TODO: create tests for cases with multiActiveExecutionEnabled
     this.dagManagementTaskStream =
-        new DagManagementTaskStreamImpl(config, Optional.empty(), 
Optional.of(mock(ReminderSettingDagProcLeaseArbiter.class)));
+        new DagManagementTaskStreamImpl(config, Optional.empty(),
+            mock(InstrumentedLeaseArbiter.class), 
Optional.of(mock(DagActionReminderScheduler.class)),

Review Comment:
   mock should be of MALA, not decorator



##########
gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java:
##########
@@ -266,6 +277,16 @@ public void cleanUp() throws Exception {
     mysql.stop();
   }
 
+  /**
+   * Tests retrieving two classes initiated by GobblinServiceGuiceModule. One 
is explicitly bound and another optionally
+   * through a configuration enabled in the setup method above.
+   */
+  @Test
+  public void testGetClass() {
+    Assert.assertTrue(this.gobblinServiceManager.getClass(DagManager.class) 
instanceof DagManager);

Review Comment:
   nit: as we anticipate soon deprecating`DagManager`, is there another class 
you could use for this test?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -57,20 +64,95 @@ public DagActionReminderScheduler(StdSchedulerFactory 
schedulerFactory, Optional
    */
   public void scheduleReminder(DagActionStore.DagAction dagAction, long 
reminderDurationMillis)
       throws SchedulerException {
-    if (!dagManagement.isPresent()) {
-      throw new RuntimeException("DagManagement not initialized in 
multi-active execution mode when required.");
-    }
-    JobDetail jobDetail = 
ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(), 
dagAction);
-    Trigger trigger = 
ReminderSettingDagProcLeaseArbiter.createReminderJobTrigger(dagAction, 
reminderDurationMillis);
+    JobDetail jobDetail = createReminderJobDetail(dagAction);
+    Trigger trigger = createReminderJobTrigger(dagAction, 
reminderDurationMillis);
     quartzScheduler.scheduleJob(jobDetail, trigger);
   }
 
   public void unscheduleReminderJob(DagActionStore.DagAction dagAction) throws 
SchedulerException {
-    if (!dagManagement.isPresent()) {
-      throw new RuntimeException("DagManagement not initialized in 
multi-active execution mode when required.");
-    }
-    JobDetail jobDetail = 
ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(), 
dagAction);
+    JobDetail jobDetail = createReminderJobDetail(dagAction);
     quartzScheduler.deleteJob(jobDetail.getKey());
   }
 
+  /**
+   * Static class used to store information regarding a pending dagAction that 
needs to be revisited at a later time
+   * by {@link DagManagement} interface to re-attempt a lease on if it has not 
been completed by the previous owner.
+   * These jobs are scheduled and used by the {@link 
DagActionReminderScheduler}.
+   */
+  @Slf4j
+  public static class ReminderJob implements Job {
+    public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
+
+    @Override
+    public void execute(JobExecutionContext context) {
+      // Get properties from the trigger to create a dagAction
+      JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
+      String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
+      String flowGroup = 
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+      String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
+      String flowId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+      DagActionStore.DagActionType dagActionType = 
DagActionStore.DagActionType.valueOf(
+          jobDataMap.getString(FLOW_ACTION_TYPE_KEY));
+
+      log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", 
flowName: " + flowName
+          + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
+
+      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
+          dagActionType);
+
+      try {
+        DagManagement dagManagement = 
GobblinServiceManager.getClass(DagManagement.class);
+        dagManagement.addDagAction(dagAction);
+      } catch (IOException e) {
+        log.error("Failed to add DagAction to DagManagement. Action: {}", 
dagAction);
+      }
+    }
+  }
+
+  /**
+   * Creates a key for the reminder job by concatenating all dagAction fields
+   */
+  public static String createDagActionReminderKey(DagActionStore.DagAction 
dagAction) {
+    return createDagActionReminderKey(dagAction.getFlowName(), 
dagAction.getFlowGroup(), dagAction.getFlowExecutionId(),
+        dagAction.getJobName(), dagAction.getDagActionType());
+  }
+
+  /**
+   * Creates a key for the reminder job by concatenating flowName, flowGroup, 
flowExecutionId, jobName, dagActionType
+   * in that order
+   */
+  public static String createDagActionReminderKey(String flowName, String 
flowGroup, String flowId, String jobName,
+      DagActionStore.DagActionType dagActionType) {
+    return String.format("%s.%s.%s.%s.%s", flowGroup, flowName, flowId, 
jobName, dagActionType);
+  }

Review Comment:
   if never called from beyond this class, suggest to fold the two overloaded 
forms into one `public static`, just as you've done w/ `createReminderJobDetail`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -118,4 +172,16 @@ private DagTask createDagTask(DagActionStore.DagAction 
dagAction, MultiActiveLea
         throw new UnsupportedOperationException("Not yet implemented");
     }
   }
+
+  /* Schedules a reminder for the flow action using {@link 
DagActionReminderScheduler} to reattempt the lease after the
+  current leaseholder's grant would have expired.
+  */
+  protected void 
scheduleReminderForEvent(MultiActiveLeaseArbiter.LeaseAttemptStatus leaseStatus)
+      throws SchedulerException {
+    if (!this.dagActionReminderScheduler.isPresent()) {
+      throw new RuntimeException(MISSING_OPTIONAL_ERROR_MESSAGE);
+    }

Review Comment:
   can't we fail-fast when mis-configuration (E.g. in the ctor), rather than 
waiting JIT once we eventually want to use something to detect the error?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -108,6 +132,36 @@ public DagTask next() {
     return null;
   }
 
+  /**
+   * Returns a {@link 
org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter.LeaseAttemptStatus} 
associated with the
+   * `dagAction`. If in multi-active execution mode, it retrieves the status 
from calling
+   * {@link MultiActiveLeaseArbiter#tryAcquireLease(DagActionStore.DagAction, 
long, boolean, boolean)}, otherwise
+   * it returns a {@link 
org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter.LeaseObtainedStatus} 
that will not
+   * expire for a very long time to the current instance.
+   * @param dagAction
+   * @return
+   * @throws IOException
+   * @throws SchedulerException
+   */
+  private MultiActiveLeaseArbiter.LeaseAttemptStatus 
retrieveLeaseStatus(DagActionStore.DagAction dagAction)
+      throws IOException, SchedulerException {
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus;
+    if (!this.isMultiActiveExecutionEnabled) {
+      leaseAttemptStatus = new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction, 
System.currentTimeMillis(), Long.MAX_VALUE, null);

Review Comment:
   let's keep the code simple by using an alternative MALA rather than directly 
faking a `LeaseObtainedStatus`.  really, that's the beauty of what DI allows!



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -43,6 +49,21 @@
 /**
  * DagManagementTaskStreamImpl implements {@link DagManagement} and {@link 
DagTaskStream}. It accepts
  * {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction}s and 
iteratively provides {@link DagTask}.
+ *
+ * If multi-active execution is enabled, then it uses {@link 
MultiActiveLeaseArbiter} to coordinate multiple hosts with
+ * execution components enabled to respond to flow action events by attempting 
ownership over a flow action event at a

Review Comment:
   "flow action" => "dag action"



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -117,12 +117,16 @@ public long getEventTimeMillis() {
     }
 
     /**
-     * Completes the lease referenced by this status object if it has not 
expired.
+     * Completes the lease referenced by this status object if it has not 
expired. Defaults to true if no lease arbiter
+     * to complete lease on.
      * @return true if able to complete lease, false otherwise.
      * @throws IOException
      */
     public boolean completeLease() throws IOException {
-      return multiActiveLeaseArbiter.recordLeaseSuccess(this);
+      if (multiActiveLeaseArbiter != null) {

Review Comment:
   actually, sorry, I don't understand how would we ever have a lease w/o a 
real MALA attached?
   
   if this arose from a desire to "mock" `LeaseObtainedStatus` for a 
"no-op"/always-true MALA impl, I'd suggest instead having that impl nonetheless 
pass itself (`this`) as the MALA param, yet then override `recordLeaseSuccess`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -57,20 +64,95 @@ public DagActionReminderScheduler(StdSchedulerFactory 
schedulerFactory, Optional
    */
   public void scheduleReminder(DagActionStore.DagAction dagAction, long 
reminderDurationMillis)
       throws SchedulerException {
-    if (!dagManagement.isPresent()) {
-      throw new RuntimeException("DagManagement not initialized in 
multi-active execution mode when required.");
-    }
-    JobDetail jobDetail = 
ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(), 
dagAction);
-    Trigger trigger = 
ReminderSettingDagProcLeaseArbiter.createReminderJobTrigger(dagAction, 
reminderDurationMillis);
+    JobDetail jobDetail = createReminderJobDetail(dagAction);
+    Trigger trigger = createReminderJobTrigger(dagAction, 
reminderDurationMillis);
     quartzScheduler.scheduleJob(jobDetail, trigger);
   }
 
   public void unscheduleReminderJob(DagActionStore.DagAction dagAction) throws 
SchedulerException {
-    if (!dagManagement.isPresent()) {
-      throw new RuntimeException("DagManagement not initialized in 
multi-active execution mode when required.");
-    }
-    JobDetail jobDetail = 
ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(), 
dagAction);
+    JobDetail jobDetail = createReminderJobDetail(dagAction);
     quartzScheduler.deleteJob(jobDetail.getKey());
   }
 
+  /**
+   * Static class used to store information regarding a pending dagAction that 
needs to be revisited at a later time
+   * by {@link DagManagement} interface to re-attempt a lease on if it has not 
been completed by the previous owner.
+   * These jobs are scheduled and used by the {@link 
DagActionReminderScheduler}.
+   */
+  @Slf4j
+  public static class ReminderJob implements Job {
+    public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
+
+    @Override
+    public void execute(JobExecutionContext context) {
+      // Get properties from the trigger to create a dagAction
+      JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
+      String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
+      String flowGroup = 
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+      String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
+      String flowId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+      DagActionStore.DagActionType dagActionType = 
DagActionStore.DagActionType.valueOf(
+          jobDataMap.getString(FLOW_ACTION_TYPE_KEY));
+
+      log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", 
flowName: " + flowName
+          + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
+
+      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
+          dagActionType);
+
+      try {
+        DagManagement dagManagement = 
GobblinServiceManager.getClass(DagManagement.class);
+        dagManagement.addDagAction(dagAction);
+      } catch (IOException e) {
+        log.error("Failed to add DagAction to DagManagement. Action: {}", 
dagAction);
+      }
+    }
+  }
+
+  /**
+   * Creates a key for the reminder job by concatenating all dagAction fields
+   */
+  public static String createDagActionReminderKey(DagActionStore.DagAction 
dagAction) {
+    return createDagActionReminderKey(dagAction.getFlowName(), 
dagAction.getFlowGroup(), dagAction.getFlowExecutionId(),
+        dagAction.getJobName(), dagAction.getDagActionType());
+  }
+
+  /**
+   * Creates a key for the reminder job by concatenating flowName, flowGroup, 
flowExecutionId, jobName, dagActionType
+   * in that order
+   */
+  public static String createDagActionReminderKey(String flowName, String 
flowGroup, String flowId, String jobName,
+      DagActionStore.DagActionType dagActionType) {
+    return String.format("%s.%s.%s.%s.%s", flowGroup, flowName, flowId, 
jobName, dagActionType);
+  }
+
+  /**
+   * Creates a jobDetail containing flow and job identifying information in 
the jobDataMap, uniquely identified
+   *  by a key comprised of the dagAction's fields.
+   */
+  public static JobDetail createReminderJobDetail(DagActionStore.DagAction 
dagAction) {
+    JobDataMap dataMap = new JobDataMap();
+    dataMap.put(ConfigurationKeys.FLOW_NAME_KEY, dagAction.getFlowName());
+    dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY, dagAction.getFlowGroup());
+    dataMap.put(ConfigurationKeys.JOB_NAME_KEY, dagAction.getJobName());
+    dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+    dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY, 
dagAction.getDagActionType());
+
+    return JobBuilder.newJob(ReminderJob.class)
+        .withIdentity(createDagActionReminderKey(dagAction), 
dagAction.getFlowName())
+        .usingJobData(dataMap)
+        .build();
+  }
+
+  /**
+   * Creates a Trigger object with the same key as the ReminderJob (since only 
one trigger is expected to be associated
+   * with a job at any given time) that should fire after 
`reminderDurationMillis` millis.
+   */
+  public static Trigger createReminderJobTrigger(DagActionStore.DagAction 
dagAction, long reminderDurationMillis) {
+    Trigger trigger = TriggerBuilder.newTrigger()
+        .withIdentity(createDagActionReminderKey(dagAction), 
dagAction.getFlowName())
+        .startAt(new Date(System.currentTimeMillis() + reminderDurationMillis))

Review Comment:
   hard-coded "clocks" make testing challenging.  AFAICT, this method here 
should just take the desired target time in millis (rather than the offset).
   
   that leaves the caller, `scheduleReminder`, the need for obtaining the 
current time.  for that, I suggest a `Supplier<Long> getCurrentTimeMillis` 
member.  pair that with a ctor form taking such a param (when testing).  the 
ctor w/o that extra param would simply pass in `System::currentTimeMillis`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -108,6 +137,36 @@ public DagTask next() {
     return null;
   }
 
+  /**
+   * Returns a {@link 
org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter.LeaseAttemptStatus} 
associated with the
+   * `dagAction`. If in multi-active execution mode, it retrieves the status 
from calling
+   * {@link MultiActiveLeaseArbiter#tryAcquireLease(DagActionStore.DagAction, 
long, boolean, boolean)}, otherwise
+   * it returns a {@link 
org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter.LeaseObtainedStatus} 
that will not
+   * expire for a very long time to the current instance.
+   * @param dagAction
+   * @return
+   * @throws IOException
+   * @throws SchedulerException
+   */
+  private MultiActiveLeaseArbiter.LeaseAttemptStatus 
retrieveLeaseStatus(DagActionStore.DagAction dagAction)
+      throws IOException, SchedulerException {
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus;
+    if (!this.isMultiActiveExecutionEnabled) {
+      leaseAttemptStatus = new 
MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction, 
System.currentTimeMillis(), Long.MAX_VALUE, null);
+    } else {

Review Comment:
   earlier I suggested to have a "always-return-lease-obtained" MALA, but that 
won't quite work, because we still need reminders, in case the attempt to 
process didn't succeed, since that deserves retry (by the single-active host).
   
   that line of reasoning might suggest an in-memory MALA that maintains each 
status based on `LeaseObtainedStatus.conclude()` having been called yet or not. 
 when not yet it returns `LeaseObtainedStatus`, but after it has, then it 
returns `NoLongerLeasing`.
   
   but taking one futher step back... why give up durability when running in 
single-active execution mode?  is there any reason not to use a bona fide 
`MysqlMALA` and merely know there would be no contention in the DB, so 
`LeasedToAnotherStatus` is never returned?
   
   regardless of our choice, don't we still need to clean up `DagAction`s in 
the store?  would using a genuine MALA help there too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to