[
https://issues.apache.org/jira/browse/GOBBLIN-1837?focusedWorklogId=865034&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-865034
]
ASF GitHub Bot logged work on GOBBLIN-1837:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 12/Jun/23 21:27
Start Date: 12/Jun/23 21:27
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1227261902
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MySQLMultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing instances. A MySQL table is used to
store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease,
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that
allow us to coordinate between instances and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on
a flow event. It should be much greater
+ * than epsilon and encapsulate executor communication latency
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH,
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+ /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
+ @FunctionalInterface
+ protected interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException, SQLException;
+ }
+
+ public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+
+ protected final DataSource dataSource;
+ private final String leaseArbiterTableName;
+ private final String constantsTableName;
+ private final int epsilon;
+ private final int linger;
+ protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=?";
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=? AND event_timestamp=? AND
lease_acquisition_timestamp=?";
+
+ protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "lease_acquisition_timestamp, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+
+ // Does a cross join between the two tables to have epsilon and linger
values available. Returns the following values:
+ // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if
event_timestamp in table is within
+ // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired,
3 if column is NULL or no longer leasing)
+ protected static final String GET_EVENT_INFO_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
+ + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
+ + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then
1"
+ + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then
2"
+ + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
Review Comment:
I tried to use boolean value for it but if the column is NULL then the
boolean returned is `false` so it becomes hard to distinguish between
leaseValid and noLease. I end up having to specially define the no lease case.
Issue Time Tracking
-------------------
Worklog Id: (was: 865034)
Time Spent: 12h 10m (was: 12h)
> Implement multi-active, non blocking for leader host
> ----------------------------------------------------
>
> Key: GOBBLIN-1837
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1837
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 12h 10m
> Remaining Estimate: 0h
>
> This task will include the implementation of non-blocking, multi-active
> scheduler for each host. It will NOT include metric emission or unit tests
> for validation. That will be done in a separate follow-up ticket. The work in
> this ticket includes
> * define a table to do scheduler lease determination for each flow's trigger
> event and related methods to execute actions on this tableĀ
> * update DagActionStore schema and DagActionStoreMonitor to act upon new
> "LAUNCH" type events in addition to KILL/RESUME
> * update scheduler/orchestrator logic to apply the non-blocking algorithm
> when "multi-active scheduler mode" is enabled, otherwise submit events
> directly to the DagManager after receiving a scheduler trigger
> * implement the non-blocking algorithm, particularly handling reminder
> events if another host is in the process of securing the lease for a
> particular flow trigger
--
This message was sent by Atlassian Jira
(v8.20.10#820010)