umustafi commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1227228937


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple 
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event. 
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event. 
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response 
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ *  1. Multiple instances receive knowledge of a flow action event to act upon
+ *  2. Each instance attempts to acquire rights or `a lease` to be the sole 
instance acting on the event by calling the
+ *      tryAcquireLease method below and receives the resulting status. The 
status indicates whether this instance has
+ *        a) acquired the lease -> then this instance will attempt to complete 
the lease
+ *        b) another has acquired the lease -> then another will attempt to 
complete the lease
+ *        c) flow event no longer needs to be acted upon -> terminal state
+ *  3. If another has acquired the lease, then the instance will check back in 
at the time of lease expiry to see if it
+ *    needs to attempt the lease again [status (b) above].
+ *  4. Once the instance which acquired the lease completes its work on the 
flow event, it calls completeLeaseUse to
+ *    indicate to all other instances that the flow event no longer needs to 
be acted upon [status (c) above]
+ */
+public interface MultiActiveLeaseArbiter {
+  static final Logger LOG = 
LoggerFactory.getLogger(MultiActiveLeaseArbiter.class);
+
+  /**
+   * This method attempts to insert an entry into store for a particular flow 
action event if one does not already
+   * exist in the store for the flow action or has expired. Regardless of the 
outcome it also reads the lease
+   * acquisition timestamp of the entry for that flow action event (it could 
have pre-existed in the table or been newly
+   *  added by the previous write). Based on the transaction results, it will 
return @LeaseAttemptStatus to determine
+   *  the next action.
+   * @param flowAction uniquely identifies the flow
+   * @param eventTimeMillis is the time this flow action should occur
+   * @return LeaseAttemptStatus
+   * @throws IOException
+   */
+  LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long 
eventTimeMillis) throws IOException;
+
+  /**
+   * This method is used to indicate the owner of the lease has successfully 
completed required actions while holding
+   * the lease of the flow action event. It marks the lease as "no longer 
leasing", if the eventTimeMillis and
+   * leaseAcquisitionTimeMillis values have not changed since this owner 
acquired the lease (indicating the lease did
+   * not expire).
+   * @return true if successfully updated, indicating no further actions need 
to be taken regarding this event.
+   */
+  boolean completeLeaseUse(DagActionStore.DagAction flowAction, long 
eventTimeMillis, long leaseAcquisitionTimeMillis)

Review Comment:
   I replaced the last two arguments with `LeaseObtainedStatus` but still kept 
the `DagActionStore.DagAction` so we have the ability to identify the flow for 
updating the record



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to