[
https://issues.apache.org/jira/browse/GOBBLIN-1837?focusedWorklogId=865028&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-865028
]
ASF GitHub Bot logged work on GOBBLIN-1837:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 12/Jun/23 20:51
Start Date: 12/Jun/23 20:51
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1227228937
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event.
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event.
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ * 1. Multiple instances receive knowledge of a flow action event to act upon
+ * 2. Each instance attempts to acquire rights or `a lease` to be the sole
instance acting on the event by calling the
+ * tryAcquireLease method below and receives the resulting status. The
status indicates whether this instance has
+ * a) acquired the lease -> then this instance will attempt to complete
the lease
+ * b) another has acquired the lease -> then another will attempt to
complete the lease
+ * c) flow event no longer needs to be acted upon -> terminal state
+ * 3. If another has acquired the lease, then the instance will check back in
at the time of lease expiry to see if it
+ * needs to attempt the lease again [status (b) above].
+ * 4. Once the instance which acquired the lease completes its work on the
flow event, it calls completeLeaseUse to
+ * indicate to all other instances that the flow event no longer needs to
be acted upon [status (c) above]
+ */
+public interface MultiActiveLeaseArbiter {
+ static final Logger LOG =
LoggerFactory.getLogger(MultiActiveLeaseArbiter.class);
+
+ /**
+ * This method attempts to insert an entry into store for a particular flow
action event if one does not already
+ * exist in the store for the flow action or has expired. Regardless of the
outcome it also reads the lease
+ * acquisition timestamp of the entry for that flow action event (it could
have pre-existed in the table or been newly
+ * added by the previous write). Based on the transaction results, it will
return @LeaseAttemptStatus to determine
+ * the next action.
+ * @param flowAction uniquely identifies the flow
+ * @param eventTimeMillis is the time this flow action should occur
+ * @return LeaseAttemptStatus
+ * @throws IOException
+ */
+ LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long
eventTimeMillis) throws IOException;
+
+ /**
+ * This method is used to indicate the owner of the lease has successfully
completed required actions while holding
+ * the lease of the flow action event. It marks the lease as "no longer
leasing", if the eventTimeMillis and
+ * leaseAcquisitionTimeMillis values have not changed since this owner
acquired the lease (indicating the lease did
+ * not expire).
+ * @return true if successfully updated, indicating no further actions need
to be taken regarding this event.
+ */
+ boolean completeLeaseUse(DagActionStore.DagAction flowAction, long
eventTimeMillis, long leaseAcquisitionTimeMillis)
Review Comment:
I replaced the last two arguments with `LeaseObtainedStatus` but still kept
the `DagActionStore.DagAction` so we have the ability to identify the flow for
updating the record
Issue Time Tracking
-------------------
Worklog Id: (was: 865028)
Time Spent: 12h (was: 11h 50m)
> Implement multi-active, non blocking for leader host
> ----------------------------------------------------
>
> Key: GOBBLIN-1837
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1837
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 12h
> Remaining Estimate: 0h
>
> This task will include the implementation of non-blocking, multi-active
> scheduler for each host. It will NOT include metric emission or unit tests
> for validation. That will be done in a separate follow-up ticket. The work in
> this ticket includes
> * define a table to do scheduler lease determination for each flow's trigger
> event and related methods to execute actions on this tableĀ
> * update DagActionStore schema and DagActionStoreMonitor to act upon new
> "LAUNCH" type events in addition to KILL/RESUME
> * update scheduler/orchestrator logic to apply the non-blocking algorithm
> when "multi-active scheduler mode" is enabled, otherwise submit events
> directly to the DagManager after receiving a scheduler trigger
> * implement the non-blocking algorithm, particularly handling reminder
> events if another host is in the process of securing the lease for a
> particular flow trigger
--
This message was sent by Atlassian Jira
(v8.20.10#820010)