[ 
https://issues.apache.org/jira/browse/GOBBLIN-1837?focusedWorklogId=865732&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-865732
 ]

ASF GitHub Bot logged work on GOBBLIN-1837:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Jun/23 08:45
            Start Date: 15/Jun/23 08:45
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1230649070


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -364,6 +374,27 @@ public void orchestrate(Spec spec) throws Exception {
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
+  public void submitFlowToDagManager(FlowSpec flowSpec)
+      throws IOException {
+    submitFlowToDagManager(flowSpec, specCompiler.compileFlow(flowSpec));
+  }
+  public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> 
jobExecutionPlanDag)

Review Comment:
   missing line between method defs



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Handler used to coordinate multiple hosts with enabled schedulers to 
respond to flow action events. It uses the
+ * {@link MysqlMultiActiveLeaseArbiter} to determine a single lease owner at a 
given time
+ * for a flow action event. After acquiring the lease, it persists the flow 
action event to the {@link DagActionStore}
+ * to be eventually acted upon by the host with the active DagManager. Once it 
has completed this action, it will mark
+ * the lease as completed by calling the
+ * {@link MysqlMultiActiveLeaseArbiter.recordLeaseSuccess()} method. Hosts 
that do not gain the lease for the event,
+ * instead schedule a reminder using the {@link SchedulerService} to check 
back in on the previous lease owner's
+ * completion status after the lease should expire to ensure the event is 
handled in failure cases.
+ */
+@Slf4j
+public class FlowTriggerHandler {
+  private final int schedulerMaxBackoffMillis;
+  private static Random random = new Random();
+  protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+  protected SchedulerService schedulerService;
+  protected DagActionStore dagActionStore;
+  private MetricContext metricContext;
+  private ContextAwareMeter numFlowsSubmitted;
+
+  @Inject
+  public FlowTriggerHandler(Config config, MultiActiveLeaseArbiter 
leaseDeterminationStore,
+      SchedulerService schedulerService, DagActionStore dagActionStore) {
+    this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
+        ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
+    this.multiActiveLeaseArbiter = leaseDeterminationStore;
+    this.schedulerService = schedulerService;
+    this.dagActionStore = dagActionStore;
+    this.metricContext = Instrumented.getMetricContext(new 
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+        this.getClass());
+    this.numFlowsSubmitted = 
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED);
+  }
+
+  /**
+   * This method is used in the multi-active scheduler case for one or more 
hosts to respond to a flow action event
+   * by attempting a lease for the flow event and processing the result 
depending on the status of the attempt.
+   * @param jobProps
+   * @param flowAction
+   * @param eventTimeMillis
+   * @throws IOException
+   */
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
+      throws IOException {
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
+        multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
+    // TODO: add a log event or metric for each of these cases
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = 
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
+      if (persistFlowAction(leaseObtainedStatus)) {
+        return;
+      }
+      // If persisting the flow action failed, then we set another trigger for 
this event to occur immediately to
+      // re-attempt handling the event
+      scheduleReminderForEvent(jobProps, new 
MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction,
+          leaseObtainedStatus.getEventTimestamp(), 0L), eventTimeMillis);
+      return;
+    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus,
+          eventTimeMillis);
+      return;
+    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+      return;
+    }
+    throw new RuntimeException(String.format("Received type of 
leaseAttemptStatus: %s not handled by this method",
+            leaseAttemptStatus.getClass().getName()));
+  }
+
+  // Called after obtaining a lease to persist the flow action to {@link 
DagActionStore} and mark the lease as done
+  private boolean 
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status) {

Review Comment:
   minor, but IMO `lease` seems much clearer than `status`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -93,7 +105,7 @@ protected void assignTopicPartitions() {
 
   @Override
   /*
-  This class is multi-threaded and this message will be called by multiple 
threads, however any given message will be
+  This class is multithreaded and this message will be called by multiple 
threads, however any given message will be

Review Comment:
   "this *message* will be called", or "this *method* will be called"?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which 
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing participants. A MySQL table is used 
to store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values 
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease, 
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp 
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that 
allow us to coordinate between participants and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to 
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on 
a flow event. It should be much greater
+ *            than epsilon and encapsulate executor communication latency 
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event 
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH, 
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution 
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and 
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most 
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of 
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt 
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark 
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+@Slf4j
+public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+  /** `j.u.Function` variant for an operation that may @throw IOException or 
SQLException: preserves method signature checked exceptions */
+  @FunctionalInterface
+  protected interface CheckedFunction<T, R> {
+    R apply(T t) throws IOException, SQLException;
+  }
+
+  protected final DataSource dataSource;
+  private final String leaseArbiterTableName;
+  private final String constantsTableName;
+  private final int epsilon;
+  private final int linger;
+
+  // TODO: define retention on this table
+  private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE 
TABLE IF NOT EXISTS %S ("
+      + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") 
NOT NULL, flow_name varchar("
+      + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + 
"flow_execution_id varchar("
+      + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, 
flow_action varchar(100) NOT NULL, "
+      + "event_timestamp TIMESTAMP, "
+      + "lease_acquisition_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+      + "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action))";
+  private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE 
IF NOT EXISTS %s "
+      + "(epsilon INT, linger INT), PRIMARY KEY (epsilon, linger); INSERT INTO 
%s (epsilon, linger) VALUES (?,?)";
+  protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE 
flow_group=? AND flow_name=? AND flow_execution_id=?"
+      + " AND flow_action=?";
+  protected static final String WHERE_CLAUSE_TO_MATCH_ROW = 
WHERE_CLAUSE_TO_MATCH_KEY
+      + " AND event_timestamp=? AND lease_acquisition_timestamp=?";
+  protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT 
ROW_COUNT() AS rows_inserted_count, "
+      + "lease_acquisition_timestamp, linger FROM %s, %s " + 
WHERE_CLAUSE_TO_MATCH_KEY;
+  // Does a cross join between the two tables to have epsilon and linger 
values available. Returns the following values:
+  // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if 
event_timestamp in table is within
+  // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired, 
3 if column is NULL or no longer leasing)
+  protected static final String GET_EVENT_INFO_STATEMENT = "SELECT 
event_timestamp, lease_acquisition_timestamp, "
+      + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
+      + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then 
1"
+      + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then 
2"
+      + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " + 
WHERE_CLAUSE_TO_MATCH_KEY;
+  // Insert or update row to acquire lease if values have not changed since 
the previous read
+  // Need to define three separate statements to handle cases where row does 
not exist or has null values to check
+  protected static final String 
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s "
+      + "(flow_group, flow_name, flow_execution_id, flow_action, 
event_timestamp) VALUES (?, ?, ?, ?, ?)";
+  protected static final String 
CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
+      + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_KEY
+      + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL";
+  protected static final String 
CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT = "UPDATE %s "
+      + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_ROW
+      + " AND event_timestamp=? AND lease_acquisition_timestamp=?";
+  // Complete lease acquisition if values have not changed since lease was 
acquired
+  protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT = 
"UPDATE %s SET "
+      + "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW;
+
+  @Inject
+  public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
+    if (config.hasPath(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX)) {
+      config = 
config.getConfig(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX).withFallback(config);
+    } else {
+      throw new IOException(String.format("Please specify the config for 
MysqlMultiActiveLeaseArbiter using prefix %s "
+          + "before all properties", 
ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX));
+    }
+
+    this.leaseArbiterTableName = ConfigUtils.getString(config, 
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
+        
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE);
+    this.constantsTableName = ConfigUtils.getString(config, 
ConfigurationKeys.MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY,
+        ConfigurationKeys.DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE);
+    this.epsilon = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY,
+        ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS);
+    this.linger = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
+        ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
+    this.dataSource = MysqlDataSourceFactory.get(config, 
SharedResourcesBrokerFactory.getImplicitBroker());
+    try (Connection connection = dataSource.getConnection();
+        PreparedStatement createStatement = 
connection.prepareStatement(String.format(
+            CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+      createStatement.executeUpdate();
+      connection.commit();
+    } catch (SQLException e) {
+      throw new IOException("Table creation failure for " + 
leaseArbiterTableName, e);
+    }
+    withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT, 
this.constantsTableName, this.constantsTableName),
+        createStatement -> {
+      int i = 0;
+      createStatement.setInt(++i, epsilon);
+      createStatement.setInt(++i, linger);
+      return createStatement.executeUpdate();}, true);
+  }
+
+  @Override
+  public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
flowAction, long eventTimeMillis)
+      throws IOException {
+    // Check table for an existing entry for this flow action and event time
+    ResultSet resultSet = withPreparedStatement(
+        String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName, 
this.constantsTableName),
+        getInfoStatement -> {
+          int i = 0;
+          getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis));
+          getInfoStatement.setString(++i, flowAction.getFlowGroup());
+          getInfoStatement.setString(++i, flowAction.getFlowName());
+          getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
+          getInfoStatement.setString(++i, 
flowAction.getFlowActionType().toString());
+          return getInfoStatement.executeQuery();
+        }, true);
+
+    try {
+      // CASE 1: If no existing row for this flow action, then go ahead and 
insert
+      if (!resultSet.next()) {
+        ResultSet rs = withPreparedStatement(
+            String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT + 
"; " + SELECT_AFTER_INSERT_STATEMENT,

Review Comment:
   personally, for clarity's sake, I'd `format` each separately and then join 
them via ";"





Issue Time Tracking
-------------------

    Worklog Id:     (was: 865732)
    Time Spent: 14h 40m  (was: 14.5h)

> Implement multi-active, non blocking for leader host
> ----------------------------------------------------
>
>                 Key: GOBBLIN-1837
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1837
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 14h 40m
>  Remaining Estimate: 0h
>
> This task will include the implementation of non-blocking, multi-active 
> scheduler for each host. It will NOT include metric emission or unit tests 
> for validation. That will be done in a separate follow-up ticket. The work in 
> this ticket includes
>  * define a table to do scheduler lease determination for each flow's trigger 
> event and related methods to execute actions on this tableĀ 
>  * update DagActionStore schema and DagActionStoreMonitor to act upon new 
> "LAUNCH" type events in addition to KILL/RESUME
>  * update scheduler/orchestrator logic to apply the non-blocking algorithm 
> when "multi-active scheduler mode" is enabled, otherwise submit events 
> directly to the DagManager after receiving a scheduler trigger
>  * implement the non-blocking algorithm, particularly handling reminder 
> events if another host is in the process of securing the lease for a 
> particular flow trigger



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to