[
https://issues.apache.org/jira/browse/GOBBLIN-1837?focusedWorklogId=864298&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-864298
]
ASF GitHub Bot logged work on GOBBLIN-1837:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 07/Jun/23 22:31
Start Date: 07/Jun/23 22:31
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1222050277
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java:
##########
@@ -26,4 +26,5 @@ public final class InjectionNames {
public static final String FORCE_LEADER = "forceLeader";
public static final String FLOW_CATALOG_LOCAL_COMMIT =
"flowCatalogLocalCommit";
public static final String WARM_STANDBY_ENABLED = "statelessRestAPIEnabled";
Review Comment:
shall we add a TODO to rename "WARM_STANDBY", being a misnomer? since,
happily it's not the actual value of underlying config name, it should be
easier to replace.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java:
##########
@@ -56,7 +64,7 @@ private DagActionStoreChangeMonitor
createDagActionStoreMonitor()
String topic = ""; // Pass empty string because we expect underlying
client to dynamically determine the Kafka topic
int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig,
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
- return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig,
this.dagActionStore, this.dagManager, numThreads);
+ return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig,
this.dagActionStore, this.dagManager, numThreads,
isMultiActiveSchedulerEnabled, flowCatalog);
Review Comment:
minor, but the ordering of the two new params is switched between the
factory and the ctor of what it creates. I suggest aligning.
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -95,6 +95,20 @@ public class ConfigurationKeys {
public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS =
"skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+ // Scheduler lease determination store configuration
+ public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
"multi.active.scheduler.constants.db.table";
+ public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= "multi.active.scheduler.";
+ public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= "scheduler.lease.determination.store.db.table";
+ public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE =
"gobblin_scheduler_lease_determination_store";
+ public static final String SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY =
"reminderEventTimestampMillis";
+ public static final String SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY =
"newEventTimestampMillis";
+ public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = "";
+ public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 100;
+ // Note: linger should be on the order of seconds even though we measure in
millis
+ public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY = "";
+ public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000;
+ public static final String SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY = "";
+ public static final int DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC = 5;
Review Comment:
those above are in millis... might it be confusing to switch to secs for
this one?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MySQLMultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing instances. A MySQL table is used to
store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease,
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that
allow us to coordinate between instances and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on
a flow event. It should be much greater
+ * than epsilon and encapsulate executor communication latency
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH,
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
Review Comment:
spelled differently than `MysqlDagActionStore`. the latter capitalization
seems prevalent in our code base
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeasedToAnotherStatus.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import lombok.Getter;
+
+/*
+The event in question has been leased to another. This object contains
`reminderEventTimestamp` which is the event
+timestamp the lease is associated with as well as `minimumReminderWaitMillis`
the minimum amount of time to wait
+before returning to check if the lease has completed or expired.
+ */
+public class LeasedToAnotherStatus extends LeaseAttemptStatus {
+ @Getter
+ private final long reminderEventTimeMillis;
Review Comment:
merely `eventTimestamp` is preferable here, since that aligns w/ the naming
in `LeaseObtainedStatus`
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -95,6 +95,20 @@ public class ConfigurationKeys {
public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS =
"skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+ // Scheduler lease determination store configuration
+ public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
"multi.active.scheduler.constants.db.table";
+ public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= "multi.active.scheduler.";
+ public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= "scheduler.lease.determination.store.db.table";
+ public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE =
"gobblin_scheduler_lease_determination_store";
+ public static final String SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY =
"reminderEventTimestampMillis";
+ public static final String SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY =
"newEventTimestampMillis";
+ public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = "";
+ public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 100;
+ // Note: linger should be on the order of seconds even though we measure in
millis
+ public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY = "";
+ public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000;
+ public static final String SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY = "";
Review Comment:
`UPPER_BOUND` => `MAX` ?
`STAGGERING` => `DELAY` ?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -580,7 +581,9 @@ public void run() {
private void clearUpDagAction(DagId dagId) throws IOException {
if (this.dagActionStore.isPresent()) {
- this.dagActionStore.get().deleteDagAction(dagId.flowGroup,
dagId.flowName, dagId.flowExecutionId);
+ this.dagActionStore.get().deleteDagAction(
+ new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName,
dagId.flowExecutionId,
+ DagActionStore.FlowActionType.KILL));
Review Comment:
how do we know for sure that this would be a `FlowActionType.KILL`? (may be
worth a code comment)
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -95,6 +95,20 @@ public class ConfigurationKeys {
public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS =
"skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+ // Scheduler lease determination store configuration
+ public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
"multi.active.scheduler.constants.db.table";
+ public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= "multi.active.scheduler.";
Review Comment:
is this supposed to be the default name for the db table? if so, should it
have '.' in it? (I'm going by what I see for
`DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE`)
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java:
##########
@@ -72,6 +73,8 @@ public class RuntimeMetrics {
public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_GET_SPEC_TIME_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.totalGetSpecTimeNanos";
public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_ADD_SPEC_TIME_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.totalAddSpecTimeNanos";
public static final String
GOBBLIN_JOB_SCHEDULER_NUM_JOBS_SCHEDULED_DURING_STARTUP =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.numJobsScheduledDuringStartup";
+ // Metrics Used to Track SchedulerLeaseAlgoHandlerProgress
+ public static final String
GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_LEASES_COMPLETED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".schedulerLeaseAlgoHandler.numLeasesCompleted";
Review Comment:
nit: what does it mean to "complete" a lease? for this host to be the one
to get it... or for any host to get it? for the lease to expire? to actually
accomplish something before it expired?
let's seek a different word to unambiguously articulate what's measured.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -440,12 +446,24 @@ public synchronized void scheduleJob(Properties jobProps,
JobListener jobListene
public void runJob(Properties jobProps, JobListener jobListener) throws
JobException {
try {
Spec flowSpec =
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
- this.orchestrator.orchestrate(flowSpec);
+ String triggerTimestampMillis = extractTriggerTimestampMillis(jobProps);
Review Comment:
just wondering... what are the ramifications of neither of the keys being
defined so the trigger timestamp winds up as `0L`?
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -95,6 +95,20 @@ public class ConfigurationKeys {
public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS =
"skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+ // Scheduler lease determination store configuration
+ public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
"multi.active.scheduler.constants.db.table";
+ public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= "multi.active.scheduler.";
+ public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= "scheduler.lease.determination.store.db.table";
+ public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE =
"gobblin_scheduler_lease_determination_store";
+ public static final String SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY =
"reminderEventTimestampMillis";
+ public static final String SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY =
"newEventTimestampMillis";
+ public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = "";
Review Comment:
empty string here (and for linger and staggering_upper_bound)?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -54,27 +53,26 @@ public void
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E
String flowName = key.getKey().getFlowName();
Long flowExecutionId = key.getKey().getFlowExecutionId();
try {
- // If an existing resume or kill request is still pending then do not
accept this request
- if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString())) {
- DagActionStore.DagActionValue action =
this.dagActionStore.getDagAction(flowGroup, flowName,
flowExecutionId.toString()).getDagActionValue();
- this.handleException(flowGroup, flowName, flowExecutionId.toString(),
- new RuntimeException("There is already a pending action " + action
+ " for this flow. Please wait to resubmit and wait for"
+ // If an existing resume request is still pending then do not accept
this request
+ if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
+ this.handleException(flowGroup, flowName, flowExecutionId.toString(),
DagActionStore.FlowActionType.RESUME,
+ new RuntimeException("There is already a pending RESUME action for
this flow. Please wait to resubmit and wait for"
Review Comment:
the signature of `handleException` is encouraging this anti-pattern of
constructing an `Exception` just to tunnel through a message. let's change the
signature to take a `String message` as the final param, and push the
invocation of `.getMessage()` up to the caller.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -71,17 +88,38 @@ public String load(String key) throws Exception {
protected DagActionStore dagActionStore;
protected DagManager dagManager;
+ protected SpecCompiler specCompiler;
+ protected boolean isMultiActiveSchedulerEnabled;
+ protected FlowCatalog flowCatalog;
+ protected EventSubmitter eventSubmitter;
// Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
// client itself to determine all Kafka related information dynamically
rather than through the config.
public DagActionStoreChangeMonitor(String topic, Config config,
DagActionStore dagActionStore, DagManager dagManager,
- int numThreads) {
+ int numThreads, boolean isMultiActiveSchedulerEnabled, FlowCatalog
flowCatalog) {
// Differentiate group id for each host
super(topic, config.withValue(GROUP_ID_KEY,
ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX +
UUID.randomUUID().toString())),
numThreads);
this.dagActionStore = dagActionStore;
this.dagManager = dagManager;
+ ClassAliasResolver aliasResolver = new
ClassAliasResolver(SpecCompiler.class);
+ try {
+ String specCompilerClassName =
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
+ if
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
+ specCompilerClassName =
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
+ }
+ log.info("Using specCompiler class name/alias " + specCompilerClassName);
+
+ this.specCompiler = (SpecCompiler)
ConstructorUtils.invokeConstructor(Class.forName(aliasResolver.resolve(
+ specCompilerClassName)), config);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException
+ | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
Review Comment:
if avoidable, I'd prefer to neither initialize nor maintain an additional
`SpecCompiler` here, apart from the one already in the `Orchestrator`. what
would it look like if we initialized this class w/ the `Orchestrator` and then
added a method to the latter handling essentially the `submitFlowToDagManager`
defined just below?
although you might still require a `FlowCatalog` on hand, the
`EventSubmitter` could go away to also be encapsulated within the `Orchestrator`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -177,15 +207,46 @@ protected void processMessage(DecodeableKafkaRecord
message) {
dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
}
+ protected void submitFlowToDagManager(String flowGroup, String flowName) {
+ // Retrieve job execution plan by recompiling the flow spec to send to the
DagManager
+ FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+ FlowSpec spec = null;
+ try {
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+ spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(spec);
+ //Send the dag to the DagManager.
+ dagManager.addDag(jobExecutionPlanDag, true, true);
+ } catch (URISyntaxException e) {
+ log.warn("Could not create URI object for flowId {} due to error {}",
flowId, e.getMessage());
+ this.unexpectedErrors.mark();
+ return;
+ } catch (SpecNotFoundException e) {
+ log.warn("Spec not found for flow group: {} name: {} Exception: {}",
flowGroup, flowName, e);
+ this.unexpectedErrors.mark();
+ return;
+ } catch (IOException e) {
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(spec);
+ String failureMessage = "Failed to add Job Execution Plan due to: " +
e.getMessage();
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
+ new TimingEvent(this.eventSubmitter,
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+ log.warn("Failed to add Job Execution Plan for flow group: {} name: {}
due to error {}", flowGroup, flowName, e);
Review Comment:
again, `flowId`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java:
##########
@@ -20,28 +20,31 @@
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collection;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
+
+import lombok.Data;
public interface DagActionStore {
- enum DagActionValue {
- KILL,
- RESUME
+ enum FlowActionType {
+ KILL, // Kill invoked through API call
+ RESUME, // Resume flow invoked through API call
+ LAUNCH, // Launch new flow execution invoked adhoc or through scheduled
trigger
+ RETRY, // Invoked through DagManager for flows configured to allow retries
+ CANCEL, // Invoked through DagManager if flow has been stuck in
Orchestrated state for a while
+ ADVANCE // Launch next step in multi-hop dag
}
- @Getter
- @EqualsAndHashCode
+ @Data
class DagAction {
String flowGroup;
String flowName;
String flowExecutionId;
- DagActionValue dagActionValue;
- public DagAction(String flowGroup, String flowName, String
flowExecutionId, DagActionValue dagActionValue) {
+ FlowActionType flowActionType;
+ public DagAction(String flowGroup, String flowName, String
flowExecutionId, FlowActionType flowActionType) {
this.flowGroup = flowGroup;
this.flowName = flowName;
this.flowExecutionId = flowExecutionId;
- this.dagActionValue = dagActionValue;
+ this.flowActionType = flowActionType;
}
Review Comment:
I actually thought `@Data` would synthesize such a ctor, but if not, then
just add `@AllArgsConstructor`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event.
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event.
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ * 1. Multiple instances receive knowledge of a flow action event to act upon
+ * 2. Each instance attempts to acquire rights or `a lease` to be the sole
instance acting on the event by calling the
+ * tryAcquireLease method below and receives the resulting status. The
status indicates whether this instance has
+ * a) acquired the lease -> then this instance will attempt to complete
the lease
+ * b) another has acquired the lease -> then another will attempt to
complete the lease
+ * c) flow event no longer needs to be acted upon -> terminal state
+ * 3. If another has acquired the lease, then the instance will check back in
at the time of lease expiry to see if it
+ * needs to attempt the lease again [status (b) above].
+ * 4. Once the instance which acquired the lease completes its work on the
flow event, it calls completeLeaseUse to
+ * indicate to all other instances that the flow event no longer needs to
be acted upon [status (c) above]
+ */
+public interface MultiActiveLeaseArbiter {
+ static final Logger LOG =
LoggerFactory.getLogger(MultiActiveLeaseArbiter.class);
Review Comment:
instead use `@Slf4j`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event.
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event.
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ * 1. Multiple instances receive knowledge of a flow action event to act upon
+ * 2. Each instance attempts to acquire rights or `a lease` to be the sole
instance acting on the event by calling the
+ * tryAcquireLease method below and receives the resulting status. The
status indicates whether this instance has
+ * a) acquired the lease -> then this instance will attempt to complete
the lease
+ * b) another has acquired the lease -> then another will attempt to
complete the lease
+ * c) flow event no longer needs to be acted upon -> terminal state
+ * 3. If another has acquired the lease, then the instance will check back in
at the time of lease expiry to see if it
+ * needs to attempt the lease again [status (b) above].
+ * 4. Once the instance which acquired the lease completes its work on the
flow event, it calls completeLeaseUse to
+ * indicate to all other instances that the flow event no longer needs to
be acted upon [status (c) above]
+ */
+public interface MultiActiveLeaseArbiter {
+ static final Logger LOG =
LoggerFactory.getLogger(MultiActiveLeaseArbiter.class);
+
+ /**
+ * This method attempts to insert an entry into store for a particular flow
action event if one does not already
+ * exist in the store for the flow action or has expired. Regardless of the
outcome it also reads the lease
+ * acquisition timestamp of the entry for that flow action event (it could
have pre-existed in the table or been newly
+ * added by the previous write). Based on the transaction results, it will
return @LeaseAttemptStatus to determine
Review Comment:
`{@link LeaseAttemptStatus}`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java:
##########
@@ -20,28 +20,31 @@
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collection;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
+
+import lombok.Data;
public interface DagActionStore {
- enum DagActionValue {
- KILL,
- RESUME
+ enum FlowActionType {
Review Comment:
better name--nice!
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event.
The type of flow event in question does
Review Comment:
suggestions:
"instances" => "participants" (since to avoid OO connotations)
"over ownership of" => "to take responsibility for"
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event.
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event.
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ * 1. Multiple instances receive knowledge of a flow action event to act upon
+ * 2. Each instance attempts to acquire rights or `a lease` to be the sole
instance acting on the event by calling the
+ * tryAcquireLease method below and receives the resulting status. The
status indicates whether this instance has
+ * a) acquired the lease -> then this instance will attempt to complete
the lease
Review Comment:
"to complete the lease" => "to carry out the required action before the
lease expires"
for clarity's sake, I'd suggest stating the class that each of the three
corresponds to, as in:
"acquired the lease (`LeaseObtainedStatus` returned) -> ..."
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -54,27 +53,26 @@ public void
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E
String flowName = key.getKey().getFlowName();
Long flowExecutionId = key.getKey().getFlowExecutionId();
try {
- // If an existing resume or kill request is still pending then do not
accept this request
- if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString())) {
- DagActionStore.DagActionValue action =
this.dagActionStore.getDagAction(flowGroup, flowName,
flowExecutionId.toString()).getDagActionValue();
- this.handleException(flowGroup, flowName, flowExecutionId.toString(),
- new RuntimeException("There is already a pending action " + action
+ " for this flow. Please wait to resubmit and wait for"
+ // If an existing resume request is still pending then do not accept
this request
+ if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
+ this.handleException(flowGroup, flowName, flowExecutionId.toString(),
DagActionStore.FlowActionType.RESUME,
+ new RuntimeException("There is already a pending RESUME action for
this flow. Please wait to resubmit and wait for"
+ " action to be completed."));
return;
}
- this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME);
- } catch (IOException | SQLException | SpecNotFoundException e) {
+ this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME);
+ } catch (IOException | SQLException e) {
log.warn(
String.format("Failed to add execution resume action for flow %s %s
%s to dag action store due to", flowGroup,
flowName, flowExecutionId), e);
- this.handleException(flowGroup, flowName, flowExecutionId.toString(), e);
+ this.handleException(flowGroup, flowName, flowExecutionId.toString(),
DagActionStore.FlowActionType.RESUME, e);
}
}
- private void handleException (String flowGroup, String flowName, String
flowExecutionId, Exception e) {
+ private void handleException (String flowGroup, String flowName, String
flowExecutionId, DagActionStore.FlowActionType flowActionType, Exception e) {
Review Comment:
nit: extra space after method name
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event.
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event.
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ * 1. Multiple instances receive knowledge of a flow action event to act upon
Review Comment:
"independently learn of"
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseAttemptStatus.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+public abstract class LeaseAttemptStatus {
+ protected LeaseAttemptStatus() {}
Review Comment:
I know the default ctor will be `public`, but just go w/ that. besides this
class is `abstract`, so none may create one anyway
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseAttemptStatus.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+public abstract class LeaseAttemptStatus {
Review Comment:
I agree w/ defining this as the common base of a three-way alt. (roughly
modeling an algebraic data type). for clarity's sake, however, let's keep the
entire hierarchy together rather than splintering aross four files.
do so by making these static inner classes of `MultiActiveLeaseArbiter`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event.
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event.
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ * 1. Multiple instances receive knowledge of a flow action event to act upon
+ * 2. Each instance attempts to acquire rights or `a lease` to be the sole
instance acting on the event by calling the
+ * tryAcquireLease method below and receives the resulting status. The
status indicates whether this instance has
+ * a) acquired the lease -> then this instance will attempt to complete
the lease
+ * b) another has acquired the lease -> then another will attempt to
complete the lease
+ * c) flow event no longer needs to be acted upon -> terminal state
+ * 3. If another has acquired the lease, then the instance will check back in
at the time of lease expiry to see if it
+ * needs to attempt the lease again [status (b) above].
+ * 4. Once the instance which acquired the lease completes its work on the
flow event, it calls completeLeaseUse to
+ * indicate to all other instances that the flow event no longer needs to
be acted upon [status (c) above]
+ */
+public interface MultiActiveLeaseArbiter {
+ static final Logger LOG =
LoggerFactory.getLogger(MultiActiveLeaseArbiter.class);
+
+ /**
+ * This method attempts to insert an entry into store for a particular flow
action event if one does not already
+ * exist in the store for the flow action or has expired. Regardless of the
outcome it also reads the lease
+ * acquisition timestamp of the entry for that flow action event (it could
have pre-existed in the table or been newly
+ * added by the previous write). Based on the transaction results, it will
return @LeaseAttemptStatus to determine
+ * the next action.
+ * @param flowAction uniquely identifies the flow
+ * @param eventTimeMillis is the time this flow action should occur
+ * @return LeaseAttemptStatus
+ * @throws IOException
+ */
+ LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long
eventTimeMillis) throws IOException;
+
+ /**
+ * This method is used to indicate the owner of the lease has successfully
completed required actions while holding
+ * the lease of the flow action event. It marks the lease as "no longer
leasing", if the eventTimeMillis and
+ * leaseAcquisitionTimeMillis values have not changed since this owner
acquired the lease (indicating the lease did
+ * not expire).
+ * @return true if successfully updated, indicating no further actions need
to be taken regarding this event.
Review Comment:
it may be non-obvious, so let's describe the meaning of `false` as well
(that the caller should continue seeking to acquire the lease, as if any
actions it did successfully accomplish, do not actually count)
relatedly, minor suggestion on naming: `recordLeaseSuccess` or
`markLeaseSuccess`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event.
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event.
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ * 1. Multiple instances receive knowledge of a flow action event to act upon
+ * 2. Each instance attempts to acquire rights or `a lease` to be the sole
instance acting on the event by calling the
+ * tryAcquireLease method below and receives the resulting status. The
status indicates whether this instance has
+ * a) acquired the lease -> then this instance will attempt to complete
the lease
+ * b) another has acquired the lease -> then another will attempt to
complete the lease
+ * c) flow event no longer needs to be acted upon -> terminal state
+ * 3. If another has acquired the lease, then the instance will check back in
at the time of lease expiry to see if it
+ * needs to attempt the lease again [status (b) above].
+ * 4. Once the instance which acquired the lease completes its work on the
flow event, it calls completeLeaseUse to
+ * indicate to all other instances that the flow event no longer needs to
be acted upon [status (c) above]
+ */
+public interface MultiActiveLeaseArbiter {
+ static final Logger LOG =
LoggerFactory.getLogger(MultiActiveLeaseArbiter.class);
+
+ /**
+ * This method attempts to insert an entry into store for a particular flow
action event if one does not already
+ * exist in the store for the flow action or has expired. Regardless of the
outcome it also reads the lease
+ * acquisition timestamp of the entry for that flow action event (it could
have pre-existed in the table or been newly
+ * added by the previous write). Based on the transaction results, it will
return @LeaseAttemptStatus to determine
+ * the next action.
+ * @param flowAction uniquely identifies the flow
+ * @param eventTimeMillis is the time this flow action should occur
Review Comment:
"...the flow and the present action upon it"
"...the time `flowAction` was triggered"
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event.
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event.
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ * 1. Multiple instances receive knowledge of a flow action event to act upon
+ * 2. Each instance attempts to acquire rights or `a lease` to be the sole
instance acting on the event by calling the
+ * tryAcquireLease method below and receives the resulting status. The
status indicates whether this instance has
+ * a) acquired the lease -> then this instance will attempt to complete
the lease
+ * b) another has acquired the lease -> then another will attempt to
complete the lease
+ * c) flow event no longer needs to be acted upon -> terminal state
+ * 3. If another has acquired the lease, then the instance will check back in
at the time of lease expiry to see if it
+ * needs to attempt the lease again [status (b) above].
+ * 4. Once the instance which acquired the lease completes its work on the
flow event, it calls completeLeaseUse to
+ * indicate to all other instances that the flow event no longer needs to
be acted upon [status (c) above]
+ */
+public interface MultiActiveLeaseArbiter {
+ static final Logger LOG =
LoggerFactory.getLogger(MultiActiveLeaseArbiter.class);
+
+ /**
+ * This method attempts to insert an entry into store for a particular flow
action event if one does not already
+ * exist in the store for the flow action or has expired. Regardless of the
outcome it also reads the lease
+ * acquisition timestamp of the entry for that flow action event (it could
have pre-existed in the table or been newly
+ * added by the previous write). Based on the transaction results, it will
return @LeaseAttemptStatus to determine
+ * the next action.
+ * @param flowAction uniquely identifies the flow
+ * @param eventTimeMillis is the time this flow action should occur
+ * @return LeaseAttemptStatus
+ * @throws IOException
+ */
+ LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long
eventTimeMillis) throws IOException;
+
+ /**
+ * This method is used to indicate the owner of the lease has successfully
completed required actions while holding
+ * the lease of the flow action event. It marks the lease as "no longer
leasing", if the eventTimeMillis and
+ * leaseAcquisitionTimeMillis values have not changed since this owner
acquired the lease (indicating the lease did
+ * not expire).
+ * @return true if successfully updated, indicating no further actions need
to be taken regarding this event.
+ */
+ boolean completeLeaseUse(DagActionStore.DagAction flowAction, long
eventTimeMillis, long leaseAcquisitionTimeMillis)
Review Comment:
to hammer home that this should ONLY be called by someone actually holding
the lease, what about having its only param be `LeaseObtainedStatus`?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseObtainedStatus.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import lombok.Getter;
+
+/*
+The instance calling this method acquired the lease for the event in question.
The class contains the `eventTimestamp`
+associated with the lease as well as the time the lease was obtained by me or
`myLeaseAcquisitionTimestamp`.
+ */
+public class LeaseObtainedStatus extends LeaseAttemptStatus {
+ @Getter
+ private final long eventTimestamp;
+
+ @Getter
+ private final long myLeaseAcquisitionTimestamp;
Review Comment:
nit: no need to clarify it's "mine"/"yours"... just `leaseAcq...Tstamp` is
enough
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseObtainedStatus.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import lombok.Getter;
+
+/*
+The instance calling this method acquired the lease for the event in question.
The class contains the `eventTimestamp`
+associated with the lease as well as the time the lease was obtained by me or
`myLeaseAcquisitionTimestamp`.
+ */
+public class LeaseObtainedStatus extends LeaseAttemptStatus {
+ @Getter
+ private final long eventTimestamp;
+
+ @Getter
+ private final long myLeaseAcquisitionTimestamp;
+
+ protected LeaseObtainedStatus(long eventTimestamp, long
myLeaseAcquisitionTimestamp) {
+ super();
+ this.eventTimestamp = eventTimestamp;
+ this.myLeaseAcquisitionTimestamp = myLeaseAcquisitionTimestamp;
+ }
Review Comment:
replace w/ `@Data` (and `@RequiredArgsConstructor`, if also necessary)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/SchedulerLeaseAlgoHandler.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
+import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
+
+
+/**
+ * Handler used to coordinate multiple hosts with enabled schedulers to
respond to flow action events. It uses the
+ * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to
determine a single lease owner at a given time
+ * for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
+ * to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
+ * the lease as completed by calling the
+ * {@link
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse}
method. Hosts that do not gain
+ * the lease for the event, instead schedule a reminder using the {@link
SchedulerService} to check back in on the
+ * previous lease owner's completion status after the lease should expire to
ensure the event is handled in failure
+ * cases.
+ */
+public class SchedulerLeaseAlgoHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
Review Comment:
`@Slf4j`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeasedToAnotherStatus.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import lombok.Getter;
+
+/*
+The event in question has been leased to another. This object contains
`reminderEventTimestamp` which is the event
+timestamp the lease is associated with as well as `minimumReminderWaitMillis`
the minimum amount of time to wait
+before returning to check if the lease has completed or expired.
+ */
+public class LeasedToAnotherStatus extends LeaseAttemptStatus {
+ @Getter
+ private final long reminderEventTimeMillis;
+
+ @Getter
+ private final long minimumReminderWaitMillis;
Review Comment:
nit: I suggest against naming 'reminder', as that unnecessarily suggests how
we expect the field to be used, and better to keep this class blissfully
unaware of what goes on downstream. instead, maybe adopt the "linger" naming,
used elsewhere. or else "lease expiration".
I'm unclear whether this is meant to be an absolute timestamp or a relative
duration (e.g. 5000 more millis). the former may be preferable, since it's
more resilient to unpredictable delays, like long GC pause.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
Review Comment:
some suggestions below... but overall, this being complicated documentation
that you wrote up very well!
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/NoLongerLeasingStatus.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+/*
+This status is returned when a flow event was successfully leased and an
instance completed the requirements for the
+event, so no further leasing is required.
+ */
+public class NoLongerLeasingStatus extends LeaseAttemptStatus {
+
+ protected NoLongerLeasingStatus() {
+ super();
+ }
Review Comment:
I don't see a need to be `protected` (vs. just accepting synthesized default
ctor's `public`).
also, NBD, but know that an implicit `super()` would be inserted for you if
you don't explicitly call one of the super class's ctors
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -200,9 +206,9 @@ public
GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
public GobblinServiceJobScheduler(String serviceName, Config config,
FlowStatusGenerator flowStatusGenerator,
Optional<HelixManager> helixManager,
Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog>
topologyCatalog, Optional<DagManager> dagManager, Optional<UserQuotaManager>
quotaManager,
- SchedulerService schedulerService, Optional<Logger> log, boolean
warmStandbyEnabled) throws Exception {
+ SchedulerService schedulerService, Optional<Logger> log, boolean
warmStandbyEnabled, boolean multiActiveSchedulerEnabled,
SchedulerLeaseAlgoHandler schedulerLeaseAlgoHandler) throws Exception {
this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
- new Orchestrator(config, flowStatusGenerator, topologyCatalog,
dagManager, log), schedulerService, quotaManager, log, warmStandbyEnabled);
+ new Orchestrator(config, flowStatusGenerator, topologyCatalog,
dagManager, log, multiActiveSchedulerEnabled, schedulerLeaseAlgoHandler),
schedulerService, quotaManager, log, warmStandbyEnabled,
multiActiveSchedulerEnabled, schedulerLeaseAlgoHandler);
Review Comment:
seeing this repeated over and over `(boolean, ScheduledLeaseAlgoHandler)`
make me wonder... would `Optional<ScheduledLeaseAlgoHandler>` be sufficient?
e.g. `Optional.absent()` stands in for when currently the `boolean` is `false`?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeasedToAnotherStatus.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import lombok.Getter;
+
+/*
+The event in question has been leased to another. This object contains
`reminderEventTimestamp` which is the event
Review Comment:
strictly speaking the event wasn't leases, but rather a lease was acquired
to exclusively handle the event
##########
gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java:
##########
@@ -600,11 +600,17 @@ public static class GobblinJob extends BaseGobblinJob
implements InterruptableJo
@Override
public void executeImpl(JobExecutionContext context)
throws JobExecutionException {
- LOG.info("Starting job " + context.getJobDetail().getKey());
- JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+ JobDetail jobDetail = context.getJobDetail();
+ LOG.info("Starting job " + jobDetail.getKey());
+ JobDataMap dataMap = jobDetail.getJobDataMap();
JobScheduler jobScheduler = (JobScheduler)
dataMap.get(JOB_SCHEDULER_KEY);
Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
+ // Obtain trigger timestamp from trigger to pass to jobProps
+ Trigger trigger = context.getTrigger();
+ long triggerTimestampMillis = trigger.getPreviousFireTime().getTime();
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(triggerTimestampMillis));
Review Comment:
maybe it's seeing "previous" (fire time) used to populate "new" (event
timestamp), but I'm unclear, when reading job props--what is indicated by
"newEventTimestampMillis"? please explain and/or propose a more
self-explanatory name... perhaps "triggerTimestampMillis"?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -177,15 +207,46 @@ protected void processMessage(DecodeableKafkaRecord
message) {
dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
}
+ protected void submitFlowToDagManager(String flowGroup, String flowName) {
+ // Retrieve job execution plan by recompiling the flow spec to send to the
DagManager
+ FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+ FlowSpec spec = null;
+ try {
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+ spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(spec);
+ //Send the dag to the DagManager.
+ dagManager.addDag(jobExecutionPlanDag, true, true);
+ } catch (URISyntaxException e) {
+ log.warn("Could not create URI object for flowId {} due to error {}",
flowId, e.getMessage());
+ this.unexpectedErrors.mark();
+ return;
+ } catch (SpecNotFoundException e) {
+ log.warn("Spec not found for flow group: {} name: {} Exception: {}",
flowGroup, flowName, e);
Review Comment:
just above we use `flowId`... can we do same here?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -440,12 +446,24 @@ public synchronized void scheduleJob(Properties jobProps,
JobListener jobListene
public void runJob(Properties jobProps, JobListener jobListener) throws
JobException {
try {
Spec flowSpec =
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
- this.orchestrator.orchestrate(flowSpec);
+ String triggerTimestampMillis = extractTriggerTimestampMillis(jobProps);
+ this.orchestrator.orchestrate(flowSpec, jobProps,
Long.parseLong(triggerTimestampMillis));
} catch (Exception e) {
throw new JobException("Failed to run Spec: " +
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
}
}
+ /*
+ Helper method used to extract the trigger timestamp from Properties object.
If key for `original` trigger exists, then
+ we use that because this is a reminder event and the actual event trigger is
the time we wanted to be reminded of the
+ original trigger.
+ */
+ public static String extractTriggerTimestampMillis(Properties jobProps) {
Review Comment:
does this need to be `public`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -54,27 +53,26 @@ public void
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E
String flowName = key.getKey().getFlowName();
Long flowExecutionId = key.getKey().getFlowExecutionId();
try {
- // If an existing resume or kill request is still pending then do not
accept this request
- if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString())) {
- DagActionStore.DagActionValue action =
this.dagActionStore.getDagAction(flowGroup, flowName,
flowExecutionId.toString()).getDagActionValue();
- this.handleException(flowGroup, flowName, flowExecutionId.toString(),
- new RuntimeException("There is already a pending action " + action
+ " for this flow. Please wait to resubmit and wait for"
+ // If an existing resume request is still pending then do not accept
this request
+ if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
+ this.handleException(flowGroup, flowName, flowExecutionId.toString(),
DagActionStore.FlowActionType.RESUME,
+ new RuntimeException("There is already a pending RESUME action for
this flow. Please wait to resubmit and wait for"
+ " action to be completed."));
return;
}
- this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME);
- } catch (IOException | SQLException | SpecNotFoundException e) {
+ this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME);
+ } catch (IOException | SQLException e) {
log.warn(
String.format("Failed to add execution resume action for flow %s %s
%s to dag action store due to", flowGroup,
flowName, flowExecutionId), e);
- this.handleException(flowGroup, flowName, flowExecutionId.toString(), e);
+ this.handleException(flowGroup, flowName, flowExecutionId.toString(),
DagActionStore.FlowActionType.RESUME, e);
}
}
- private void handleException (String flowGroup, String flowName, String
flowExecutionId, Exception e) {
+ private void handleException (String flowGroup, String flowName, String
flowExecutionId, DagActionStore.FlowActionType flowActionType, Exception e) {
try {
- if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId)) {
+ if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId,
flowActionType)) {
Review Comment:
having a separate, subsequent, and repeated check of `exists` looks like a
race condition. instead whoever calls `handleException` should indicate
whether or not it previously existed upon the first (and ideally only) call to
`exists`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event.
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event.
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ * 1. Multiple instances receive knowledge of a flow action event to act upon
+ * 2. Each instance attempts to acquire rights or `a lease` to be the sole
instance acting on the event by calling the
+ * tryAcquireLease method below and receives the resulting status. The
status indicates whether this instance has
+ * a) acquired the lease -> then this instance will attempt to complete
the lease
+ * b) another has acquired the lease -> then another will attempt to
complete the lease
+ * c) flow event no longer needs to be acted upon -> terminal state
+ * 3. If another has acquired the lease, then the instance will check back in
at the time of lease expiry to see if it
Review Comment:
"if another participant acquired the lease before this one could, then the
present participant must..."
Issue Time Tracking
-------------------
Worklog Id: (was: 864298)
Time Spent: 8h 10m (was: 8h)
> Implement multi-active, non blocking for leader host
> ----------------------------------------------------
>
> Key: GOBBLIN-1837
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1837
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 8h 10m
> Remaining Estimate: 0h
>
> This task will include the implementation of non-blocking, multi-active
> scheduler for each host. It will NOT include metric emission or unit tests
> for validation. That will be done in a separate follow-up ticket. The work in
> this ticket includes
> * define a table to do scheduler lease determination for each flow's trigger
> event and related methods to execute actions on this tableĀ
> * update DagActionStore schema and DagActionStoreMonitor to act upon new
> "LAUNCH" type events in addition to KILL/RESUME
> * update scheduler/orchestrator logic to apply the non-blocking algorithm
> when "multi-active scheduler mode" is enabled, otherwise submit events
> directly to the DagManager after receiving a scheduler trigger
> * implement the non-blocking algorithm, particularly handling reminder
> events if another host is in the process of securing the lease for a
> particular flow trigger
--
This message was sent by Atlassian Jira
(v8.20.10#820010)