[ 
https://issues.apache.org/jira/browse/GOBBLIN-1837?focusedWorklogId=862493&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-862493
 ]

ASF GitHub Bot logged work on GOBBLIN-1837:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/May/23 00:56
            Start Date: 27/May/23 00:56
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1207456238


##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc:
##########
@@ -23,6 +23,11 @@
     "type" : "string",
     "doc" : "flow execution id for the dag action",
     "compliance" : "NONE"
+  }, {
+    "name" : "dagAction",
+    "type" : "string",

Review Comment:
   I see reasonable arguments for an enum here, as there are only a small 
number of actions.  what do you see as pros/cons?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -137,12 +139,12 @@ public boolean deleteDagAction(String flowGroup, String 
flowName, String flowExe
       connection.commit();
       return result != 0;
     } catch (SQLException e) {
-      throw new IOException(String.format("Failure to delete action for table 
%s of flow with flow group:%s, flow name:%s and flow execution id:%s",
-          tableName, flowGroup, flowName, flowExecutionId), e);
+      throw new IOException(String.format("Failure to delete action for table 
%s of flow with flow group:%s, flow name:%s, flow execution id:%s and 
dagAction: %s",
+          tableName, flowGroup, flowName, flowExecutionId, dagActionValue), e);

Review Comment:
   tip: if you used `DagAction` as a POJO, you could use its `.toString()` to 
the same effect--w/o duplication across these messages.  e.g.:
   ```
   IOE(String.format("...of flow with %s", tableName, dagAction), e);
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -310,6 +316,17 @@ public void orchestrate(Spec spec) throws Exception {
         flowCompilationTimer.get().stop(flowMetadata);
       }
 
+      // If multi-active scheduler is enabled do not pass onto DagManager, 
otherwise scheduler forwards it directly
+      if (this.isMultiActiveSchedulerEnabled) {
+        String flowExecutionId = 
flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+        boolean leaseAttemptSucceeded = 
schedulerLeaseAlgoHandler.handleNewTriggerEvent(jobProps, flowGroup, flowName,
+            flowExecutionId, 
SchedulerLeaseDeterminationStore.FlowActionType.LAUNCH, triggerTimestampMillis);
+        _log.info("scheduler attempted lease on flowGroup: %s, flowName: %s, 
flowExecutionId: %s, LAUNCH event for "

Review Comment:
   nit: this is very conversational.  I find more regularized, even robotic 
text easier to parse and grok when "grepping" a large volume of log msgs.  
suggestion:
   ```
   [flowGroup: 'foo-grp', flowName: 'bar-fn', execId: 1234] - multi-active 
LAUNCH(987654): FAILURE
   ```
   (where `987654` would be the trigger tstamp)
   
   later we might add other multi-active status messages, each with a type, 
like `CANCEL`, `RESUME`, `ADVANCE` (i.e. to the next job in the flow).  each 
could carry action-specific info in the parens next to it, which would discern 
exactly which instance it is  e.g.
   ```
   [g, n, id] - multi-active ADVANCE(<<job-name>>): SUCCESS
   ```
   
   looking ahead to verification and debugging, the end result is that message 
filtering becomes trivial and flexible on so many dimensions, including:
   * for a particular flow group+name
   * for a particular flow group+name+id
   * for a particular flow group+name+id+LAUNCH(tstamp) (e.g. across hosts)
   * related to multi-active in any way
   * related to a certain multi-active "action", such as `RESUME`
   * specifically those that are `FAILURE` or `SUCCESS`
   * etc.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Interface defines the two basic actions required for lease determination 
for each FlowActionType event for a flow.
+ * It is used by the {@link SchedulerLeaseAlgoHandler} to allow multiple 
scheduler's on different hosts to determine
+ * which scheduler is tasked with ensuring the FlowAction is taken for the 
trigger.
+ */
+public interface SchedulerLeaseDeterminationStore {

Review Comment:
   naming-wise: is this strictly limited to scheduling?
   
   anyway I'd encourage more evocative naming that suggests the underlying 
architectural role or pattern.  e.g. `MultiActiveLeaseArbiter`.  ("broker" and 
"controller" carry different design patterns specific connotations)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java:
##########
@@ -27,7 +27,9 @@
 public interface DagActionStore {
   enum DagActionValue {
     KILL,
-    RESUME
+    RESUME,
+    // TODO: potentially combine this enum with {@link 
SchedulerLeaseDeterminationStore.FlowActionType}
+    LAUNCH
   }
 
   @Getter

Review Comment:
   would it be equivalent to use the `@Data` lombok annotation, which would 
also synthesize the constructor?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -120,13 +121,14 @@ public void addDagAction(String flowGroup, String 
flowName, String flowExecution
       insertStatement.executeUpdate();
       connection.commit();
     } catch (SQLException e) {
-      throw new IOException(String.format("Failure to adding action for table 
%s of flow with flow group:%s, flow name:%s and flow execution id:%s",
-          tableName, flowGroup, flowName, flowExecutionId), e);
+      throw new IOException(String.format("Failure to adding action for table 
%s of flow with flow group: %s, flow name"

Review Comment:
   nit: "to add" or just "adding"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java:
##########
@@ -40,12 +44,19 @@ public class DagActionStoreChangeMonitorFactory implements 
Provider<DagActionSto
   private final Config config;
   private DagActionStore dagActionStore;
   private DagManager dagManager;
+  private FlowCatalog flowCatalog;
+  private boolean isMultiActiveSchedulerEnabled;
 
   @Inject
-  public DagActionStoreChangeMonitorFactory(Config config, DagActionStore 
dagActionStore, DagManager dagManager) {
+  public DagActionStoreChangeMonitorFactory(Config config, DagActionStore 
dagActionStore, DagManager dagManager,
+      FlowCatalog flowCatalog, @Named(InjectionNames.WARM_STANDBY_ENABLED) 
boolean isMultiActiveSchedulerEnabled) {

Review Comment:
   is the mismatch in names intentional?  if so, please make sure there's a 
code comment somewhere, probably at the `InjectionNames` def



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java:
##########
@@ -70,21 +73,23 @@ public DagAction(String flowGroup, String flowName, String 
flowExecutionId, DagA
    * @param flowGroup flow group for the dag action
    * @param flowName flow name for the dag action
    * @param flowExecutionId flow execution for the dag action
+   * @param dagActionValue the value of the dag action
    * @throws IOException
    * @return true if we successfully delete one record, return false if the 
record does not exist
    */
-  boolean deleteDagAction(String flowGroup, String flowName, String 
flowExecutionId) throws IOException;
+  boolean deleteDagAction(String flowGroup, String flowName, String 
flowExecutionId, DagActionValue dagActionValue) throws IOException;
 
   /***
    * Retrieve action value by the flow group, flow name and flow execution id 
from the {@link DagActionStore}.
    * @param flowGroup flow group for the dag action
    * @param flowName flow name for the dag action
    * @param flowExecutionId flow execution for the dag action
+   * @param dagActionValue the value of the dag action
    * @throws IOException Exception in retrieving the {@link DagAction}.
    * @throws SpecNotFoundException If {@link DagAction} being retrieved is not 
present in store.
    */
-  DagAction getDagAction(String flowGroup, String flowName, String 
flowExecutionId) throws IOException, SpecNotFoundException,
-                                                                               
            SQLException;
+  DagAction getDagAction(String flowGroup, String flowName, String 
flowExecutionId, DagActionValue dagActionValue)

Review Comment:
   one thing I don't understand: a `DagAction` has exactly the four fields that 
are params to this method.  if so, does this method just duplicate `exists`?
   
   merely wondering: is there a use case for getting any and all actions 
related to a particular flow execution?
   
   relatedly w/ `deleteDagAction` (above), couldn't that take just a single 
param of type `DagAction`?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -151,20 +153,21 @@ private DagAction getDagActionWithRetry(String flowGroup, 
String flowName, Strin
       getStatement.setString(++i, flowGroup);
       getStatement.setString(++i, flowName);
       getStatement.setString(++i, flowExecutionId);
+      getStatement.setString(++i, dagActionValue.toString());

Review Comment:
   tip: if you want to use `DagAction` as a value type/POJO, you could even add 
a method to encapsulate this repetitive setup:
   ```
   void parepareStatement(PreparedStatement ps, int nextVarIndex)
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -440,7 +446,11 @@ public synchronized void scheduleJob(Properties jobProps, 
JobListener jobListene
   public void runJob(Properties jobProps, JobListener jobListener) throws 
JobException {
     try {
       Spec flowSpec = 
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
-      this.orchestrator.orchestrate(flowSpec);
+      String triggerTimestampMillis =
+          
jobProps.containsKey(ConfigurationKeys.SCHEDULER_ORIGINAL_TRIGGER_TIMESTAMP_MILLIS_KEY)
+              ? 
jobProps.getProperty(ConfigurationKeys.SCHEDULER_ORIGINAL_TRIGGER_TIMESTAMP_MILLIS_KEY,
 "0L"):
+              
jobProps.getProperty(ConfigurationKeys.SCHEDULER_TRIGGER_TIMESTAMP_MILLIS_KEY,"0L");
+      this.orchestrator.orchestrate(flowSpec, jobProps, 
Long.parseLong(triggerTimestampMillis));

Review Comment:
   not major, but seems worth raising the level of abstraction w/ a method like 
`extractTriggerTimestampMillis`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -310,6 +316,17 @@ public void orchestrate(Spec spec) throws Exception {
         flowCompilationTimer.get().stop(flowMetadata);
       }
 
+      // If multi-active scheduler is enabled do not pass onto DagManager, 
otherwise scheduler forwards it directly
+      if (this.isMultiActiveSchedulerEnabled) {
+        String flowExecutionId = 
flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+        boolean leaseAttemptSucceeded = 
schedulerLeaseAlgoHandler.handleNewTriggerEvent(jobProps, flowGroup, flowName,
+            flowExecutionId, 
SchedulerLeaseDeterminationStore.FlowActionType.LAUNCH, triggerTimestampMillis);
+        _log.info("scheduler attempted lease on flowGroup: %s, flowName: %s, 
flowExecutionId: %s, LAUNCH event for "
+            + "triggerTimestamp: %s that was " + (leaseAttemptSucceeded ? "" : 
"NOT") + "successful", flowGroup,

Review Comment:
   `"NOT"` => `"NOT "`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java:
##########
@@ -120,13 +121,14 @@ public void addDagAction(String flowGroup, String 
flowName, String flowExecution
       insertStatement.executeUpdate();
       connection.commit();
     } catch (SQLException e) {
-      throw new IOException(String.format("Failure to adding action for table 
%s of flow with flow group:%s, flow name:%s and flow execution id:%s",
-          tableName, flowGroup, flowName, flowExecutionId), e);
+      throw new IOException(String.format("Failure to adding action for table 
%s of flow with flow group: %s, flow name"
+              + ": %s, flow execution id: %s, and dag action: %s", tableName, 
flowGroup, flowName, flowExecutionId,
+          dagActionValue), e);
     }
   }
 
   @Override
-  public boolean deleteDagAction(String flowGroup, String flowName, String 
flowExecutionId) throws IOException {
+  public boolean deleteDagAction(String flowGroup, String flowName, String 
flowExecutionId, DagActionValue dagActionValue) throws IOException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement deleteStatement = 
connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
       int i = 0;

Review Comment:
   don't you need to add the `dagActionValue`, now that there's another "bind 
variable" in the statement (i.e. the fourth "?" now)?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Interface defines the two basic actions required for lease determination 
for each FlowActionType event for a flow.
+ * It is used by the {@link SchedulerLeaseAlgoHandler} to allow multiple 
scheduler's on different hosts to determine
+ * which scheduler is tasked with ensuring the FlowAction is taken for the 
trigger.
+ */
+public interface SchedulerLeaseDeterminationStore {
+  static final Logger LOG = 
LoggerFactory.getLogger(SchedulerLeaseDeterminationStore.class);
+
+  // Enum is used to reason about the three possible scenarios that can result 
from an attempt to obtain a lease for a
+  // particular trigger event of a flow
+  enum LeaseAttemptStatus {
+    LEASE_OBTAINED,
+    PREVIOUS_LEASE_EXPIRED,
+    PREVIOUS_LEASE_VALID

Review Comment:
   I respect using an enum for the tri-state result, but the three I had 
imagined are more like:
   * LEASE_IS_YOURS / LEASE_OBTAINED (acquired)
   * LEASED_TO_ANOTHER / LEASE_NOT_AVAILABLE
   * NO_LONGER_LEASING / LEASE_NOT_POSSIBLE (not necessary / there's nothing to 
lease)
   
   did you have the same three concepts in mind, just named differently--or 
different idea here?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Interface defines the two basic actions required for lease determination 
for each FlowActionType event for a flow.
+ * It is used by the {@link SchedulerLeaseAlgoHandler} to allow multiple 
scheduler's on different hosts to determine
+ * which scheduler is tasked with ensuring the FlowAction is taken for the 
trigger.
+ */
+public interface SchedulerLeaseDeterminationStore {
+  static final Logger LOG = 
LoggerFactory.getLogger(SchedulerLeaseDeterminationStore.class);
+
+  // Enum is used to reason about the three possible scenarios that can result 
from an attempt to obtain a lease for a
+  // particular trigger event of a flow
+  enum LeaseAttemptStatus {
+    LEASE_OBTAINED,
+    PREVIOUS_LEASE_EXPIRED,
+    PREVIOUS_LEASE_VALID
+  }
+
+  // Action to take on a particular flow
+  enum FlowActionType {
+    LAUNCH,
+    RETRY,
+    CANCEL,
+    NEXT_HOP

Review Comment:
   nit: I mentioned `ADVANCE` elsewhere, but `NEXT_HOP` is fine too.  as for 
`RETRY`, I believe `RESUME` is the terminology we've adopted pretty widely--or 
do you find precendent for `RETRY`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -310,6 +316,17 @@ public void orchestrate(Spec spec) throws Exception {
         flowCompilationTimer.get().stop(flowMetadata);
       }
 
+      // If multi-active scheduler is enabled do not pass onto DagManager, 
otherwise scheduler forwards it directly
+      if (this.isMultiActiveSchedulerEnabled) {
+        String flowExecutionId = 
flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+        boolean leaseAttemptSucceeded = 
schedulerLeaseAlgoHandler.handleNewTriggerEvent(jobProps, flowGroup, flowName,
+            flowExecutionId, 
SchedulerLeaseDeterminationStore.FlowActionType.LAUNCH, triggerTimestampMillis);
+        _log.info("scheduler attempted lease on flowGroup: %s, flowName: %s, 
flowExecutionId: %s, LAUNCH event for "
+            + "triggerTimestamp: %s that was " + (leaseAttemptSucceeded ? "" : 
"NOT") + "successful", flowGroup,
+            flowName, flowExecutionId, triggerTimestampMillis);
+        return;

Review Comment:
   especially as there's an `if/else` just after, why not connect this block to 
that, rather than adding a premature `return` here?  (these can be potentially 
more error prone for maintainers.)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Interface defines the two basic actions required for lease determination 
for each FlowActionType event for a flow.
+ * It is used by the {@link SchedulerLeaseAlgoHandler} to allow multiple 
scheduler's on different hosts to determine
+ * which scheduler is tasked with ensuring the FlowAction is taken for the 
trigger.
+ */
+public interface SchedulerLeaseDeterminationStore {
+  static final Logger LOG = 
LoggerFactory.getLogger(SchedulerLeaseDeterminationStore.class);
+
+  // Enum is used to reason about the three possible scenarios that can result 
from an attempt to obtain a lease for a
+  // particular trigger event of a flow
+  enum LeaseAttemptStatus {
+    LEASE_OBTAINED,
+    PREVIOUS_LEASE_EXPIRED,
+    PREVIOUS_LEASE_VALID
+  }
+
+  // Action to take on a particular flow
+  enum FlowActionType {
+    LAUNCH,
+    RETRY,
+    CANCEL,
+    NEXT_HOP
+  }
+
+  /**
+   * This method attempts to insert an entry into store for a particular 
flow's trigger event if one does not already
+   * exist in the store for the same trigger event. Regardless of the outcome 
it also reads the pursuant timestamp of
+   * the entry for that trigger event (it could have pre-existed in the table 
or been newly added by the previous
+   * write). Based on the transaction results, it will return 
@LeaseAttemptStatus to determine the next action.
+   * @param flowGroup
+   * @param flowName
+   * @param flowExecutionId
+   * @param triggerTimeMillis is the time this flow is supposed to be launched
+   * @return LeaseAttemptStatus
+   * @throws IOException
+   */
+  LeaseAttemptStatus attemptInsertAndGetPursuantTimestamp(String flowGroup, 
String flowName,

Review Comment:
   seems misleading to name `GetPursuantTimestamp`, when that doesn't appear to 
be returned to the caller.  that said, won't the participant who fails to 
acquire the lease want to know when the lease was granted, to use that in 
deciding when to try again?
   
   nit: "pursuing" is slightly closer to the sense of what's happening here, 
than is "pursuant".  the former is also likely to be more familiar naming to 
maintainers.
   
   alternatively what about simply naming this `acquireLease` or 
`tryAcquireLease`?
   



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Interface defines the two basic actions required for lease determination 
for each FlowActionType event for a flow.
+ * It is used by the {@link SchedulerLeaseAlgoHandler} to allow multiple 
scheduler's on different hosts to determine
+ * which scheduler is tasked with ensuring the FlowAction is taken for the 
trigger.
+ */
+public interface SchedulerLeaseDeterminationStore {
+  static final Logger LOG = 
LoggerFactory.getLogger(SchedulerLeaseDeterminationStore.class);
+
+  // Enum is used to reason about the three possible scenarios that can result 
from an attempt to obtain a lease for a
+  // particular trigger event of a flow
+  enum LeaseAttemptStatus {
+    LEASE_OBTAINED,
+    PREVIOUS_LEASE_EXPIRED,
+    PREVIOUS_LEASE_VALID
+  }
+
+  // Action to take on a particular flow
+  enum FlowActionType {
+    LAUNCH,
+    RETRY,
+    CANCEL,
+    NEXT_HOP
+  }
+
+  /**
+   * This method attempts to insert an entry into store for a particular 
flow's trigger event if one does not already
+   * exist in the store for the same trigger event. Regardless of the outcome 
it also reads the pursuant timestamp of
+   * the entry for that trigger event (it could have pre-existed in the table 
or been newly added by the previous
+   * write). Based on the transaction results, it will return 
@LeaseAttemptStatus to determine the next action.
+   * @param flowGroup
+   * @param flowName
+   * @param flowExecutionId
+   * @param triggerTimeMillis is the time this flow is supposed to be launched
+   * @return LeaseAttemptStatus
+   * @throws IOException
+   */
+  LeaseAttemptStatus attemptInsertAndGetPursuantTimestamp(String flowGroup, 
String flowName,
+      String flowExecutionId, FlowActionType flowActionType, long 
triggerTimeMillis) throws IOException;
+
+  /**
+   * This method is used by `attemptInsertAndGetPursuantTimestamp` above to 
indicate the host has successfully completed

Review Comment:
   not having read further yet, the description here makes this sound like an 
internal impl detail.  does it need to be part of the public interface?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 862493)
    Time Spent: 20m  (was: 10m)

> Implement multi-active, non blocking for leader host
> ----------------------------------------------------
>
>                 Key: GOBBLIN-1837
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1837
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> This task will include the implementation of non-blocking, multi-active 
> scheduler for each host. It will NOT include metric emission or unit tests 
> for validation. That will be done in a separate follow-up ticket. The work in 
> this ticket includes
>  * define a table to do scheduler lease determination for each flow's trigger 
> event and related methods to execute actions on this tableĀ 
>  * update DagActionStore schema and DagActionStoreMonitor to act upon new 
> "LAUNCH" type events in addition to KILL/RESUME
>  * update scheduler/orchestrator logic to apply the non-blocking algorithm 
> when "multi-active scheduler mode" is enabled, otherwise submit events 
> directly to the DagManager after receiving a scheduler trigger
>  * implement the non-blocking algorithm, particularly handling reminder 
> events if another host is in the process of securing the lease for a 
> particular flow trigger



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to