[
https://issues.apache.org/jira/browse/GOBBLIN-1837?focusedWorklogId=865141&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-865141
]
ASF GitHub Bot logged work on GOBBLIN-1837:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 13/Jun/23 09:46
Start Date: 13/Jun/23 09:46
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1227693255
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -96,19 +96,19 @@ public class ConfigurationKeys {
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS =
"skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
// Scheduler lease determination store configuration
- public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
"multi.active.scheduler.constants.db.table";
- public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= "multi.active.scheduler.";
- public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= "scheduler.lease.determination.store.db.table";
- public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE =
"gobblin_scheduler_lease_determination_store";
- public static final String SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY =
"reminderEventTimestampMillis";
- public static final String SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY =
"newEventTimestampMillis";
- public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = "";
+ public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
"MysqlMultiActiveLeaseArbiter.constantsTable";
+ public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= "MysqlMultiActiveLeaseArbiter.gobblin_multi_active_scheduler_constants_store";
+ public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= "MysqlMultiActiveLeaseArbiter.schedulerLeaseArbiterTable";
Review Comment:
no biggie, but for the lot of these, I suggest separately defining
`MYSQL_LEASE_ARBITER_PREFIX` and then prepending that to each of the many keys
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -96,19 +96,19 @@ public class ConfigurationKeys {
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS =
"skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
// Scheduler lease determination store configuration
- public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
"multi.active.scheduler.constants.db.table";
- public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= "multi.active.scheduler.";
- public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= "scheduler.lease.determination.store.db.table";
- public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE =
"gobblin_scheduler_lease_determination_store";
- public static final String SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY =
"reminderEventTimestampMillis";
- public static final String SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY =
"newEventTimestampMillis";
- public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = "";
+ public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
"MysqlMultiActiveLeaseArbiter.constantsTable";
+ public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= "MysqlMultiActiveLeaseArbiter.gobblin_multi_active_scheduler_constants_store";
+ public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= "MysqlMultiActiveLeaseArbiter.schedulerLeaseArbiterTable";
+ public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE =
"MysqlMultiActiveLeaseArbiter.gobblin_scheduler_lease_determination_store";
+ public static final String SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY =
"eventToRevisitTimestampMillis";
+ public static final String SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY =
"triggerEventTimestampMillis";
+ public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY =
"MysqlMultiActiveLeaseArbiter.epsilonMillis";
public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 100;
Review Comment:
of course we'll tune this as we proceed.... still, on considering potential
causes of late triggers, such as full GC pause, I'd imagine the need for a
larger value. I'd probably start w/ 5s
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -19,38 +19,38 @@
import java.io.IOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Data;
+
/**
* This interface defines a generic approach to a non-blocking, multiple
active thread or host system, in which one or
- * more active instances compete over ownership of a particular flow's event.
The type of flow event in question does
- * not impact the algorithm other than to uniquely identify the flow event.
Each instance uses the interface to initiate
- * an attempt at ownership over the flow event and receives a response
indicating the status of the attempt.
+ * more active participants compete to take responsiblity for a particular
flow's event. The type of flow event in
+ * question does not impact the algorithm other than to uniquely identify the
flow event. Each participant uses the
+ * interface to initiate an attempt at ownership over the flow event and
receives a response indicating the status of
+ * the attempt.
*
* At a high level the lease arbiter works as follows:
- * 1. Multiple instances receive knowledge of a flow action event to act upon
- * 2. Each instance attempts to acquire rights or `a lease` to be the sole
instance acting on the event by calling the
- * tryAcquireLease method below and receives the resulting status. The
status indicates whether this instance has
- * a) acquired the lease -> then this instance will attempt to complete
the lease
- * b) another has acquired the lease -> then another will attempt to
complete the lease
- * c) flow event no longer needs to be acted upon -> terminal state
- * 3. If another has acquired the lease, then the instance will check back in
at the time of lease expiry to see if it
- * needs to attempt the lease again [status (b) above].
- * 4. Once the instance which acquired the lease completes its work on the
flow event, it calls completeLeaseUse to
- * indicate to all other instances that the flow event no longer needs to
be acted upon [status (c) above]
+ * 1. Multiple participants independently learn of a flow action event to act
upon
+ * 2. Each participant attempts to acquire rights or `a lease` to be the sole
participant acting on the event by
+ * calling the tryAcquireLease method below and receives the resulting
status. The status indicates whether this
+ * participant has
+ * a) LeaseObtainedStatus -> this participant will attempt to carry out
the required action before the lease expires
+ * b) LeasedToAnotherStatus -> another will attempt to carry out the
required action before the lease expires
+ * c) NoLongerLeasingStatus -> flow event no longer needs to be acted
upon or terminal state
Review Comment:
nit: "...acted upon (terminal state)"
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -62,7 +62,38 @@ public interface MultiActiveLeaseArbiter {
* leaseAcquisitionTimeMillis values have not changed since this owner
acquired the lease (indicating the lease did
* not expire).
* @return true if successfully updated, indicating no further actions need
to be taken regarding this event.
+ * false if failed to update the lease properly, the caller should
continue seeking to acquire the lease as
+ * if any actions it did successfully accomplish, do not count
*/
- boolean completeLeaseUse(DagActionStore.DagAction flowAction, long
eventTimeMillis, long leaseAcquisitionTimeMillis)
- throws IOException;
+ boolean recordLeaseSuccess(DagActionStore.DagAction flowAction,
LeaseObtainedStatus status) throws IOException;
+
+ /*
+ Object used to encapsulate status of lease acquisition attempt and derived
should contain information specific to
Review Comment:
nits:
this (and all those below) should really be class javadoc to show up in that
tool
a class (not an object)
"derived [classes/types]" OR "derivations"
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -62,7 +62,38 @@ public interface MultiActiveLeaseArbiter {
* leaseAcquisitionTimeMillis values have not changed since this owner
acquired the lease (indicating the lease did
* not expire).
* @return true if successfully updated, indicating no further actions need
to be taken regarding this event.
+ * false if failed to update the lease properly, the caller should
continue seeking to acquire the lease as
+ * if any actions it did successfully accomplish, do not count
*/
- boolean completeLeaseUse(DagActionStore.DagAction flowAction, long
eventTimeMillis, long leaseAcquisitionTimeMillis)
- throws IOException;
+ boolean recordLeaseSuccess(DagActionStore.DagAction flowAction,
LeaseObtainedStatus status) throws IOException;
Review Comment:
nit: to streamline this interface, I'd personally tunnel the `flowAction` as
a package-protected member of `LeaseObtainedStatus`. thus, the core
identification of what to record can be derived from that single param.
in a way this is also more robust, since one `status` could never be
mismatched w/ an unrelated `flowAction`.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java:
##########
@@ -35,17 +36,12 @@ enum FlowActionType {
}
@Data
+ @AllArgsConstructor
class DagAction {
String flowGroup;
String flowName;
String flowExecutionId;
FlowActionType flowActionType;
Review Comment:
first off, this is beautiful... nearly looks like we're writing scala! ;p
secondly, in what situations are we expecting the need to mutate the fields
(i.e. why can't they be `final`?)? in general `@Data` / POJOs work quite well
being completely immutable. (which is why such classes may not require
`@AllArgsConstructor` in addition to `@Data`.)
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -62,7 +62,38 @@ public interface MultiActiveLeaseArbiter {
* leaseAcquisitionTimeMillis values have not changed since this owner
acquired the lease (indicating the lease did
* not expire).
* @return true if successfully updated, indicating no further actions need
to be taken regarding this event.
+ * false if failed to update the lease properly, the caller should
continue seeking to acquire the lease as
+ * if any actions it did successfully accomplish, do not count
*/
- boolean completeLeaseUse(DagActionStore.DagAction flowAction, long
eventTimeMillis, long leaseAcquisitionTimeMillis)
- throws IOException;
+ boolean recordLeaseSuccess(DagActionStore.DagAction flowAction,
LeaseObtainedStatus status) throws IOException;
+
+ /*
+ Object used to encapsulate status of lease acquisition attempt and derived
should contain information specific to
+ the status that results.
+ */
+ abstract class LeaseAttemptStatus {}
+
+ class NoLongerLeasingStatus extends LeaseAttemptStatus {}
+
+ /*
+ The participant calling this method acquired the lease for the event in
question. The class contains the `eventTimestamp`
+ associated with the lease as well as the time the caller obtained the lease
or `leaseAcquisitionTimestamp`.
+ */
+ @Data
+ class LeaseObtainedStatus extends LeaseAttemptStatus {
+ private final long eventTimestamp;
+ private final long leaseAcquisitionTimestamp;
+ }
+
+ /*
+ This flow action event already has a valid lease owned by another host.
+ */
+ @Data
+ class LeasedToAnotherStatus extends LeaseAttemptStatus {
+ // the timestamp the lease is associated with, but it may be a different
timestamp for the same flow action
+ // (a previous participant of the event)
+ private final long eventTimeMillis;
+ // the minimum amount of time to wait before returning to check if the
lease has completed or expired
+ private final long minimumLingerDurationMillis;
Review Comment:
better to describe these in the class doc to cause them to show up in
javadoc HTML and IDE help, etc.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -30,62 +29,60 @@
import org.quartz.JobKey;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.typesafe.config.Config;
import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
-import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
/**
* Handler used to coordinate multiple hosts with enabled schedulers to
respond to flow action events. It uses the
- * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to
determine a single lease owner at a given time
+ * {@link MysqlMultiActiveLeaseArbiter} to determine a single lease owner at a
given time
* for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
* to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
* the lease as completed by calling the
- * {@link
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse}
method. Hosts that do not gain
+ * MysqlMultiActiveLeaseArbiter.recordLeaseSuccess method. Hosts that do not
gain
* the lease for the event, instead schedule a reminder using the {@link
SchedulerService} to check back in on the
* previous lease owner's completion status after the lease should expire to
ensure the event is handled in failure
* cases.
*/
-public class SchedulerLeaseAlgoHandler {
- private static final Logger LOG =
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
- private final int staggerUpperBoundSec;
+@Slf4j
+public class FlowTriggerHandler {
+ private final int schedulerMaxBackoffMillis;
private static Random random = new Random();
protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
protected JobScheduler jobScheduler;
protected SchedulerService schedulerService;
protected DagActionStore dagActionStore;
private MetricContext metricContext;
private ContextAwareMeter numLeasesCompleted;
+
@Inject
- public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter
leaseDeterminationStore,
+ // TODO: should multiActiveLeaseArbiter and DagActionStore be optional?
Review Comment:
I wouldn't think so... but do elaborate on your thought process
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -30,62 +29,60 @@
import org.quartz.JobKey;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.typesafe.config.Config;
import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
-import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
/**
* Handler used to coordinate multiple hosts with enabled schedulers to
respond to flow action events. It uses the
- * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to
determine a single lease owner at a given time
+ * {@link MysqlMultiActiveLeaseArbiter} to determine a single lease owner at a
given time
* for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
* to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
* the lease as completed by calling the
- * {@link
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse}
method. Hosts that do not gain
+ * MysqlMultiActiveLeaseArbiter.recordLeaseSuccess method. Hosts that do not
gain
* the lease for the event, instead schedule a reminder using the {@link
SchedulerService} to check back in on the
* previous lease owner's completion status after the lease should expire to
ensure the event is handled in failure
* cases.
*/
-public class SchedulerLeaseAlgoHandler {
- private static final Logger LOG =
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
- private final int staggerUpperBoundSec;
+@Slf4j
+public class FlowTriggerHandler {
+ private final int schedulerMaxBackoffMillis;
private static Random random = new Random();
protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
protected JobScheduler jobScheduler;
protected SchedulerService schedulerService;
protected DagActionStore dagActionStore;
private MetricContext metricContext;
private ContextAwareMeter numLeasesCompleted;
+
@Inject
- public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter
leaseDeterminationStore,
+ // TODO: should multiActiveLeaseArbiter and DagActionStore be optional?
+ public FlowTriggerHandler(Config config, MultiActiveLeaseArbiter
leaseDeterminationStore,
JobScheduler jobScheduler, SchedulerService schedulerService,
DagActionStore dagActionStore) {
- this.staggerUpperBoundSec = ConfigUtils.getInt(config,
- ConfigurationKeys.SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY,
- ConfigurationKeys.DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC);
+ this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
this.multiActiveLeaseArbiter = leaseDeterminationStore;
this.jobScheduler = jobScheduler;
this.schedulerService = schedulerService;
this.dagActionStore = dagActionStore;
this.metricContext = Instrumented.getMetricContext(new
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
this.getClass());
- this.numLeasesCompleted =
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_LEASES_COMPLETED);
+ this.numLeasesCompleted =
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_FLOWS_SUBMITTED);
Review Comment:
looks like the constant needs renaming to
`GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_SUBMITTED`
(shall we update `numLeasesCompleted` as well?)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -30,62 +29,60 @@
import org.quartz.JobKey;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.typesafe.config.Config;
import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
-import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
/**
* Handler used to coordinate multiple hosts with enabled schedulers to
respond to flow action events. It uses the
- * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to
determine a single lease owner at a given time
+ * {@link MysqlMultiActiveLeaseArbiter} to determine a single lease owner at a
given time
* for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
* to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
* the lease as completed by calling the
- * {@link
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse}
method. Hosts that do not gain
+ * MysqlMultiActiveLeaseArbiter.recordLeaseSuccess method. Hosts that do not
gain
Review Comment:
I prefer the javadoc `@link` you had previously
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config,
MultiActiveLeaseArbiter leaseDet
* @param eventTimeMillis
* @throws IOException
*/
- public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- LeaseAttemptStatus leaseAttemptStatus =
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
- switch (leaseAttemptStatus.getClass().getSimpleName()) {
- case "LeaseObtainedStatus":
- finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
- break;
- case "LeasedToAnotherStatus":
- scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
- break;
- case "NoLongerLeasingStatus":
- break;
- default:
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus, flowAction);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+ eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ return;
}
+ log.warn("Received type of leaseAttemptStatus: {} not handled by this
method", leaseAttemptStatus.getClass().getName());
Review Comment:
since this represents a gaping hole in our impl, it's actually more
appropriate to scream/panic/freak out w/ a `RuntimeException`, than it is to
presume to continue (as if completely shirking all responsibility for the
`flowAction`)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config,
MultiActiveLeaseArbiter leaseDet
* @param eventTimeMillis
* @throws IOException
*/
- public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- LeaseAttemptStatus leaseAttemptStatus =
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
- switch (leaseAttemptStatus.getClass().getSimpleName()) {
- case "LeaseObtainedStatus":
- finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
- break;
- case "LeasedToAnotherStatus":
- scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
- break;
- case "NoLongerLeasingStatus":
- break;
- default:
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus, flowAction);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+ eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ return;
}
+ log.warn("Received type of leaseAttemptStatus: {} not handled by this
method", leaseAttemptStatus.getClass().getName());
}
// Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
- private boolean finalizeLease(LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
+ private boolean
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
try {
this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
- if (this.dagActionStore.exists(flowAction.getFlowGroup(),
flowAction.getFlowName(),
- flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
- // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
- this.numLeasesCompleted.mark();
- return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction,
status.getEventTimestamp(),
- status.getMyLeaseAcquisitionTimestamp());
- }
- } catch (IOException | SQLException e) {
+ // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
+ this.numLeasesCompleted.mark();
+ return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction,
status);
+ } catch (IOException e) {
throw new RuntimeException(e);
}
- // TODO: should this return an error or print a warning log if failed to
commit to dag action store?
- return false;
}
/**
- * This method is used by {@link
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for
itself
- * to check on the other participant's progress to finish acting on a flow
action after the time the lease should
- * expire.
+ * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to
schedule a reminder for itself to check on
+ * the other participant's progress to finish acting on a flow action after
the time the lease should expire.
* @param jobProps
* @param status used to extract event to be reminded for and the minimum
time after which reminder should occur
* @param originalEventTimeMillis the event timestamp we were originally
handling
* @param flowAction
*/
- private void scheduleReminderForEvent(Properties jobProps,
LeasedToAnotherStatus status,
+ private void scheduleReminderForEvent(Properties jobProps,
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
// Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
- String cronExpression =
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() +
random.nextInt(staggerUpperBoundSec));
+ String cronExpression =
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+ + random.nextInt(schedulerMaxBackoffMillis));
jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
- // Ensure we save the event timestamp that we're setting reminder for, in
addition to our own event timestamp which may be different
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
+ // Ensure we save the event timestamp that we're setting reminder for to
have for debugging purposes
+ // in addition to the event we want to initiate
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(status.getEventTimeMillis()));
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(originalEventTimeMillis));
JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
- Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+ // Create a new trigger for the flow in job scheduler that is set to fire
at the minimum reminder wait time calculated
+ Trigger trigger = this.jobScheduler.createTriggerForJob(key, jobProps);
try {
- LOG.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -
attempting to schedule reminder for event %s in %s millis",
- flowAction, originalEventTimeMillis,
status.getReminderEventTimeMillis(), trigger.getNextFireTime());
+ log.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -
attempting to schedule reminder for event %s in %s millis",
Review Comment:
update to "Flow Trigger Handler -'
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config,
MultiActiveLeaseArbiter leaseDet
* @param eventTimeMillis
* @throws IOException
*/
- public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- LeaseAttemptStatus leaseAttemptStatus =
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
- switch (leaseAttemptStatus.getClass().getSimpleName()) {
- case "LeaseObtainedStatus":
- finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
- break;
- case "LeasedToAnotherStatus":
- scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
- break;
- case "NoLongerLeasingStatus":
- break;
- default:
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus, flowAction);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+ eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ return;
}
+ log.warn("Received type of leaseAttemptStatus: {} not handled by this
method", leaseAttemptStatus.getClass().getName());
}
// Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
- private boolean finalizeLease(LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
+ private boolean
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
try {
this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
- if (this.dagActionStore.exists(flowAction.getFlowGroup(),
flowAction.getFlowName(),
- flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
- // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
- this.numLeasesCompleted.mark();
- return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction,
status.getEventTimestamp(),
- status.getMyLeaseAcquisitionTimestamp());
- }
- } catch (IOException | SQLException e) {
+ // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
+ this.numLeasesCompleted.mark();
+ return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction,
status);
+ } catch (IOException e) {
throw new RuntimeException(e);
}
- // TODO: should this return an error or print a warning log if failed to
commit to dag action store?
- return false;
}
/**
- * This method is used by {@link
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for
itself
- * to check on the other participant's progress to finish acting on a flow
action after the time the lease should
- * expire.
+ * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to
schedule a reminder for itself to check on
+ * the other participant's progress to finish acting on a flow action after
the time the lease should expire.
* @param jobProps
* @param status used to extract event to be reminded for and the minimum
time after which reminder should occur
* @param originalEventTimeMillis the event timestamp we were originally
handling
* @param flowAction
*/
- private void scheduleReminderForEvent(Properties jobProps,
LeasedToAnotherStatus status,
+ private void scheduleReminderForEvent(Properties jobProps,
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
// Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
- String cronExpression =
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() +
random.nextInt(staggerUpperBoundSec));
+ String cronExpression =
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+ + random.nextInt(schedulerMaxBackoffMillis));
jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
- // Ensure we save the event timestamp that we're setting reminder for, in
addition to our own event timestamp which may be different
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
+ // Ensure we save the event timestamp that we're setting reminder for to
have for debugging purposes
+ // in addition to the event we want to initiate
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(status.getEventTimeMillis()));
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(originalEventTimeMillis));
JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
- Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+ // Create a new trigger for the flow in job scheduler that is set to fire
at the minimum reminder wait time calculated
+ Trigger trigger = this.jobScheduler.createTriggerForJob(key, jobProps);
try {
- LOG.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -
attempting to schedule reminder for event %s in %s millis",
- flowAction, originalEventTimeMillis,
status.getReminderEventTimeMillis(), trigger.getNextFireTime());
+ log.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -
attempting to schedule reminder for event %s in %s millis",
+ flowAction, originalEventTimeMillis, status.getEventTimeMillis(),
trigger.getNextFireTime());
this.schedulerService.getScheduler().scheduleJob(trigger);
} catch (SchedulerException e) {
- LOG.warn("Failed to add job reminder due to SchedulerException for job
%s trigger event %s ", key, status.getReminderEventTimeMillis(), e);
+ log.warn("Failed to add job reminder due to SchedulerException for job
%s trigger event %s ", key, status.getEventTimeMillis(), e);
}
- LOG.info(String.format("Scheduler Lease Algo Handler - [%s,
eventTimestamp: %s] - SCHEDULED REMINDER for event %s in %s millis",
- flowAction, originalEventTimeMillis,
status.getReminderEventTimeMillis(), trigger.getNextFireTime()));
+ log.info(String.format("Scheduler Lease Algo Handler - [%s,
eventTimestamp: %s] - SCHEDULED REMINDER for event %s in %s millis",
Review Comment:
(also here)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config,
MultiActiveLeaseArbiter leaseDet
* @param eventTimeMillis
* @throws IOException
*/
- public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- LeaseAttemptStatus leaseAttemptStatus =
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
- switch (leaseAttemptStatus.getClass().getSimpleName()) {
- case "LeaseObtainedStatus":
- finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
- break;
- case "LeasedToAnotherStatus":
- scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
- break;
- case "NoLongerLeasingStatus":
- break;
- default:
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus, flowAction);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+ eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ return;
}
+ log.warn("Received type of leaseAttemptStatus: {} not handled by this
method", leaseAttemptStatus.getClass().getName());
}
// Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
- private boolean finalizeLease(LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
+ private boolean
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
try {
this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
- if (this.dagActionStore.exists(flowAction.getFlowGroup(),
flowAction.getFlowName(),
- flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
- // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
- this.numLeasesCompleted.mark();
- return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction,
status.getEventTimestamp(),
- status.getMyLeaseAcquisitionTimestamp());
- }
- } catch (IOException | SQLException e) {
+ // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
+ this.numLeasesCompleted.mark();
+ return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction,
status);
+ } catch (IOException e) {
throw new RuntimeException(e);
}
- // TODO: should this return an error or print a warning log if failed to
commit to dag action store?
- return false;
}
/**
- * This method is used by {@link
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for
itself
- * to check on the other participant's progress to finish acting on a flow
action after the time the lease should
- * expire.
+ * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to
schedule a reminder for itself to check on
+ * the other participant's progress to finish acting on a flow action after
the time the lease should expire.
* @param jobProps
* @param status used to extract event to be reminded for and the minimum
time after which reminder should occur
* @param originalEventTimeMillis the event timestamp we were originally
handling
* @param flowAction
*/
- private void scheduleReminderForEvent(Properties jobProps,
LeasedToAnotherStatus status,
+ private void scheduleReminderForEvent(Properties jobProps,
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
Review Comment:
it may be necessary to overload this w/ a form taking
`LeaseObtainedStatus`...
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -55,31 +55,27 @@ public void
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E
try {
// If an existing resume request is still pending then do not accept
this request
if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
- this.handleException(flowGroup, flowName, flowExecutionId.toString(),
DagActionStore.FlowActionType.RESUME,
- new RuntimeException("There is already a pending RESUME action for
this flow. Please wait to resubmit and wait for"
- + " action to be completed."));
+ this.handleException("There is already a pending RESUME action for
this flow. Please wait to resubmit and wait "
Review Comment:
nit: `handleError` / `prepareError` (?)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -166,8 +165,8 @@ public
GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
Config config,
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
Optional<TopologyCatalog> topologyCatalog,
Orchestrator orchestrator, SchedulerService schedulerService,
Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
- @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled,
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean
multiActiveSchedulerEnabled,
- SchedulerLeaseAlgoHandler schedulerLeaseAlgoHandler) throws Exception {
+ @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled,
+ Optional<FlowTriggerHandler> schedulerLeaseAlgoHandler) throws Exception
{
Review Comment:
rename `schedulerLeaseAlgoHandler`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -55,31 +55,27 @@ public void
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E
try {
// If an existing resume request is still pending then do not accept
this request
if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
- this.handleException(flowGroup, flowName, flowExecutionId.toString(),
DagActionStore.FlowActionType.RESUME,
- new RuntimeException("There is already a pending RESUME action for
this flow. Please wait to resubmit and wait for"
- + " action to be completed."));
+ this.handleException("There is already a pending RESUME action for
this flow. Please wait to resubmit and wait "
+ + "for action to be completed.", HttpStatus.S_409_CONFLICT);
return;
}
this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME);
} catch (IOException | SQLException e) {
log.warn(
String.format("Failed to add execution resume action for flow %s %s
%s to dag action store due to", flowGroup,
flowName, flowExecutionId), e);
- this.handleException(flowGroup, flowName, flowExecutionId.toString(),
DagActionStore.FlowActionType.RESUME, e);
+ this.handleException(e.getMessage(),
HttpStatus.S_500_INTERNAL_SERVER_ERROR);
}
}
- private void handleException (String flowGroup, String flowName, String
flowExecutionId, DagActionStore.FlowActionType flowActionType, Exception e) {
- try {
- if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId,
flowActionType)) {
- throw new RestLiServiceException(HttpStatus.S_409_CONFLICT,
e.getMessage());
- } else {
- throw new
RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
- }
- } catch (IOException | SQLException ex) {
- throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR,
e.getMessage());
+ private void handleException(String exceptionMessage, HttpStatus errorType) {
+ if (errorType == HttpStatus.S_409_CONFLICT) {
+ throw new RestLiServiceException(HttpStatus.S_409_CONFLICT,
exceptionMessage);
+ } else if (errorType == HttpStatus.S_400_BAD_REQUEST) {
+ new UpdateResponse(HttpStatus.S_400_BAD_REQUEST);
Review Comment:
the other two throw... should this not?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -66,24 +67,37 @@
* host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
* further leasing should be done for the event.
*/
-public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+@Slf4j
+public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
/** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
@FunctionalInterface
protected interface CheckedFunction<T, R> {
R apply(T t) throws IOException, SQLException;
}
- public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+ public static final String CONFIG_PREFIX = "MysqlMultiActiveLeaseArbiter";
Review Comment:
better to define the prefix in `ConfigurationKeys`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -199,23 +197,27 @@ public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagAction flowAction, l
int leaseValidityStatus = resultSet.getInt(4);
Review Comment:
hopefully doesn't feel like overkill, but I'd abstract this by defining a
`static` inner `@Data` class with an overloaded constructor (or `static`
factory method) taking a `ResultSet`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -100,8 +114,8 @@ public class MySQLMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
// Insert or update row to acquire lease if values have not changed since
the previous read
// Need to define three separate statements to handle cases where row does
not exist or has null values to check
protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s "
- + "(flow_group, flow_name, flow_execution_id, flow_action,
event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT "
- + "EXISTS (SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_KEY + "); " +
SELECT_AFTER_INSERT_STATEMENT;
+ + "(flow_group, flow_name, flow_execution_id, flow_action,
event_timestamp) VALUES (?, ?, ?, ?, ?); "
+ + SELECT_AFTER_INSERT_STATEMENT;
Review Comment:
just thinking... it may be clearer not to append this to the constant, but
rather to catenate them at point of use
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -579,11 +579,10 @@ public void run() {
}
}
- private void clearUpDagAction(DagId dagId) throws IOException {
+ private void clearUpDagAction(DagId dagId, DagActionStore.FlowActionType
flowActionType) throws IOException {
Review Comment:
nit: "clear up" seems indirect and w/ a wide variety of connotations (from
misunderstandings and weather... to skin care).
what are we doing here? "resolving" the action? "dismissing"? "deleting"?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java:
##########
@@ -581,7 +581,7 @@ public void close() throws IOException {
/**
* Get a {@link org.quartz.Trigger} from the given job configuration
properties.
*/
- public Trigger getTrigger(JobKey jobKey, Properties jobProps) {
+ public Trigger createTriggerForJob(JobKey jobKey, Properties jobProps) {
Review Comment:
couldn't this be `static`? if so, let's!
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -579,11 +579,10 @@ public void run() {
}
}
- private void clearUpDagAction(DagId dagId) throws IOException {
+ private void clearUpDagAction(DagId dagId, DagActionStore.FlowActionType
flowActionType) throws IOException {
if (this.dagActionStore.isPresent()) {
this.dagActionStore.get().deleteDagAction(
- new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName,
dagId.flowExecutionId,
- DagActionStore.FlowActionType.KILL));
+ new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName,
dagId.flowExecutionId, flowActionType));
Review Comment:
minor, but depending on how prevalent this is, it might be reasonable to
supply an additional constructor:
```
public DagAction(DagId dagId, FlowActionType flowActionType) { ... }
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -106,22 +104,20 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
private FlowStatusGenerator flowStatusGenerator;
private UserQuotaManager quotaManager;
- private boolean isMultiActiveSchedulerEnabled;
- private SchedulerLeaseAlgoHandler schedulerLeaseAlgoHandler;
+ private Optional<FlowTriggerHandler> schedulerLeaseAlgoHandler;
Review Comment:
rename `schedulerLeaseAlgoHandler`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -317,26 +312,27 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
}
// If multi-active scheduler is enabled do not pass onto DagManager,
otherwise scheduler forwards it directly
- if (this.isMultiActiveSchedulerEnabled) {
+ if (schedulerLeaseAlgoHandler.isPresent()) {
+ // If triggerTimestampMillis is 0, then it was not set by the job
trigger handler, and we cannot handle this event
+ if (triggerTimestampMillis == 0L) {
+ _log.warn("Skipping execution of spec: {} because missing trigger
timestamp in job properties",
+ jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow orchestration
skipped because no trigger timestamp "
+ + "associated with flow action.");
+ if (this.eventSubmitter.isPresent()) {
+ new TimingEvent(this.eventSubmitter.get(),
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
Review Comment:
tip: `ifPresent()`
(note: this works easily and naturally... unless checked exceptions)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config,
MultiActiveLeaseArbiter leaseDet
* @param eventTimeMillis
* @throws IOException
*/
- public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- LeaseAttemptStatus leaseAttemptStatus =
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
- switch (leaseAttemptStatus.getClass().getSimpleName()) {
- case "LeaseObtainedStatus":
- finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
- break;
- case "LeasedToAnotherStatus":
- scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
- break;
- case "NoLongerLeasingStatus":
- break;
- default:
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus, flowAction);
Review Comment:
shouldn't we confirm the return value to determine whether to schedule a
reminder?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -378,6 +374,26 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
}
+ public void submitFlowToDagManager(FlowSpec flowSpec,
Optional<Dag<JobExecutionPlan>> jobExecutionPlanDag)
+ throws IOException {
+ if (!jobExecutionPlanDag.isPresent()) {
+ jobExecutionPlanDag = Optional.of(specCompiler.compileFlow(flowSpec));
+ }
Review Comment:
while `Optional` is generally great, it's only needed when those calling are
unaware of whether or not they are holding on to a `Dag<>` or not. when it is
statically known by the caller, instead clearer is to overload
`submitFlowToDagManager` to both a unary and binary form.
here the unary form merely compiles the DAG before forwarding/delegating to
the binary one:
```
public void submitFlowToDagManager(FlowSpec flowSpec) throws IOE {
submitFlowToDagManager(flowSpec, specCompiler.compileFlow(flowSpec));
}
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config,
MultiActiveLeaseArbiter leaseDet
* @param eventTimeMillis
* @throws IOException
*/
- public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- LeaseAttemptStatus leaseAttemptStatus =
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
- switch (leaseAttemptStatus.getClass().getSimpleName()) {
- case "LeaseObtainedStatus":
- finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
- break;
- case "LeasedToAnotherStatus":
- scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
- break;
- case "NoLongerLeasingStatus":
- break;
- default:
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus, flowAction);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+ eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ return;
}
+ log.warn("Received type of leaseAttemptStatus: {} not handled by this
method", leaseAttemptStatus.getClass().getName());
}
// Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
- private boolean finalizeLease(LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
+ private boolean
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
try {
this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
- if (this.dagActionStore.exists(flowAction.getFlowGroup(),
flowAction.getFlowName(),
- flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
- // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
- this.numLeasesCompleted.mark();
- return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction,
status.getEventTimestamp(),
- status.getMyLeaseAcquisitionTimestamp());
- }
- } catch (IOException | SQLException e) {
+ // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
+ this.numLeasesCompleted.mark();
+ return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction,
status);
+ } catch (IOException e) {
throw new RuntimeException(e);
}
- // TODO: should this return an error or print a warning log if failed to
commit to dag action store?
- return false;
}
/**
- * This method is used by {@link
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for
itself
- * to check on the other participant's progress to finish acting on a flow
action after the time the lease should
- * expire.
+ * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to
schedule a reminder for itself to check on
+ * the other participant's progress to finish acting on a flow action after
the time the lease should expire.
* @param jobProps
* @param status used to extract event to be reminded for and the minimum
time after which reminder should occur
* @param originalEventTimeMillis the event timestamp we were originally
handling
* @param flowAction
*/
- private void scheduleReminderForEvent(Properties jobProps,
LeasedToAnotherStatus status,
+ private void scheduleReminderForEvent(Properties jobProps,
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
// Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
- String cronExpression =
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() +
random.nextInt(staggerUpperBoundSec));
+ String cronExpression =
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+ + random.nextInt(schedulerMaxBackoffMillis));
jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
- // Ensure we save the event timestamp that we're setting reminder for, in
addition to our own event timestamp which may be different
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
+ // Ensure we save the event timestamp that we're setting reminder for to
have for debugging purposes
+ // in addition to the event we want to initiate
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(status.getEventTimeMillis()));
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(originalEventTimeMillis));
JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
- Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+ // Create a new trigger for the flow in job scheduler that is set to fire
at the minimum reminder wait time calculated
+ Trigger trigger = this.jobScheduler.createTriggerForJob(key, jobProps);
Review Comment:
would be ideal if this invocation could be `static`!
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config,
MultiActiveLeaseArbiter leaseDet
* @param eventTimeMillis
* @throws IOException
*/
- public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- LeaseAttemptStatus leaseAttemptStatus =
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
- switch (leaseAttemptStatus.getClass().getSimpleName()) {
- case "LeaseObtainedStatus":
- finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
- break;
- case "LeasedToAnotherStatus":
- scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
- break;
- case "NoLongerLeasingStatus":
- break;
- default:
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus, flowAction);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+ eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ return;
}
+ log.warn("Received type of leaseAttemptStatus: {} not handled by this
method", leaseAttemptStatus.getClass().getName());
}
// Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
- private boolean finalizeLease(LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
+ private boolean
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
try {
this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
- if (this.dagActionStore.exists(flowAction.getFlowGroup(),
flowAction.getFlowName(),
- flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
- // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
- this.numLeasesCompleted.mark();
- return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction,
status.getEventTimestamp(),
- status.getMyLeaseAcquisitionTimestamp());
- }
- } catch (IOException | SQLException e) {
+ // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
+ this.numLeasesCompleted.mark();
+ return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction,
status);
+ } catch (IOException e) {
throw new RuntimeException(e);
}
- // TODO: should this return an error or print a warning log if failed to
commit to dag action store?
- return false;
}
/**
- * This method is used by {@link
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for
itself
- * to check on the other participant's progress to finish acting on a flow
action after the time the lease should
- * expire.
+ * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to
schedule a reminder for itself to check on
Review Comment:
a. I like the `@link`
b. "a reminder for itself" => "a self-reminder"
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config,
MultiActiveLeaseArbiter leaseDet
* @param eventTimeMillis
* @throws IOException
*/
- public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- LeaseAttemptStatus leaseAttemptStatus =
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
- switch (leaseAttemptStatus.getClass().getSimpleName()) {
- case "LeaseObtainedStatus":
- finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
- break;
- case "LeasedToAnotherStatus":
- scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
- break;
- case "NoLongerLeasingStatus":
- break;
- default:
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus, flowAction);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+ eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ return;
}
+ log.warn("Received type of leaseAttemptStatus: {} not handled by this
method", leaseAttemptStatus.getClass().getName());
}
// Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
- private boolean finalizeLease(LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
+ private boolean
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
try {
this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
- if (this.dagActionStore.exists(flowAction.getFlowGroup(),
flowAction.getFlowName(),
- flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
- // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
- this.numLeasesCompleted.mark();
- return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction,
status.getEventTimestamp(),
- status.getMyLeaseAcquisitionTimestamp());
- }
- } catch (IOException | SQLException e) {
+ // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
+ this.numLeasesCompleted.mark();
+ return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction,
status);
+ } catch (IOException e) {
throw new RuntimeException(e);
}
- // TODO: should this return an error or print a warning log if failed to
commit to dag action store?
- return false;
}
/**
- * This method is used by {@link
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for
itself
- * to check on the other participant's progress to finish acting on a flow
action after the time the lease should
- * expire.
+ * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to
schedule a reminder for itself to check on
+ * the other participant's progress to finish acting on a flow action after
the time the lease should expire.
* @param jobProps
* @param status used to extract event to be reminded for and the minimum
time after which reminder should occur
* @param originalEventTimeMillis the event timestamp we were originally
handling
* @param flowAction
*/
- private void scheduleReminderForEvent(Properties jobProps,
LeasedToAnotherStatus status,
+ private void scheduleReminderForEvent(Properties jobProps,
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
// Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
- String cronExpression =
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() +
random.nextInt(staggerUpperBoundSec));
+ String cronExpression =
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+ + random.nextInt(schedulerMaxBackoffMillis));
jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
- // Ensure we save the event timestamp that we're setting reminder for, in
addition to our own event timestamp which may be different
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
+ // Ensure we save the event timestamp that we're setting reminder for to
have for debugging purposes
+ // in addition to the event we want to initiate
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(status.getEventTimeMillis()));
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(originalEventTimeMillis));
JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
- Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+ // Create a new trigger for the flow in job scheduler that is set to fire
at the minimum reminder wait time calculated
+ Trigger trigger = this.jobScheduler.createTriggerForJob(key, jobProps);
try {
- LOG.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -
attempting to schedule reminder for event %s in %s millis",
- flowAction, originalEventTimeMillis,
status.getReminderEventTimeMillis(), trigger.getNextFireTime());
+ log.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -
attempting to schedule reminder for event %s in %s millis",
+ flowAction, originalEventTimeMillis, status.getEventTimeMillis(),
trigger.getNextFireTime());
this.schedulerService.getScheduler().scheduleJob(trigger);
} catch (SchedulerException e) {
- LOG.warn("Failed to add job reminder due to SchedulerException for job
%s trigger event %s ", key, status.getReminderEventTimeMillis(), e);
+ log.warn("Failed to add job reminder due to SchedulerException for job
%s trigger event %s ", key, status.getEventTimeMillis(), e);
}
- LOG.info(String.format("Scheduler Lease Algo Handler - [%s,
eventTimestamp: %s] - SCHEDULED REMINDER for event %s in %s millis",
- flowAction, originalEventTimeMillis,
status.getReminderEventTimeMillis(), trigger.getNextFireTime()));
+ log.info(String.format("Scheduler Lease Algo Handler - [%s,
eventTimestamp: %s] - SCHEDULED REMINDER for event %s in %s millis",
+ flowAction, originalEventTimeMillis, status.getEventTimeMillis(),
trigger.getNextFireTime()));
}
/**
* These methods should only be called from the Orchestrator or JobScheduler
classes as it directly adds jobs to the
* Quartz scheduler
- * @param delayPeriodSeconds
+ * @param delayPeriodMillis
* @return
*/
- protected static String createCronFromDelayPeriod(long delayPeriodSeconds) {
+ protected static String createCronFromDelayPeriod(long delayPeriodMillis) {
LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
- LocalDateTime delaySecondsLater = now.plus(delayPeriodSeconds,
ChronoUnit.SECONDS);
+ LocalDateTime delaySecondsLater = now.plus(delayPeriodMillis,
ChronoUnit.MILLIS);
Review Comment:
nit: `delaySecondsLater` is neither seconds nor an interval, but an absolute
time in millis
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -165,9 +161,8 @@ public Orchestrator(Config config,
Optional<TopologyCatalog> topologyCatalog, Op
@Inject
public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator,
Optional<TopologyCatalog> topologyCatalog,
- Optional<DagManager> dagManager, Optional<Logger> log,
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean
multiActiveSchedulerEnabled,
- SchedulerLeaseAlgoHandler schedulerLeaseAlgoHandler) {
- this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true,
multiActiveSchedulerEnabled, schedulerLeaseAlgoHandler);
+ Optional<DagManager> dagManager, Optional<Logger> log,
Optional<FlowTriggerHandler> schedulerLeaseAlgoHandler) {
Review Comment:
need to rename `schedulerLeaseAlgoHandler`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -66,24 +67,37 @@
* host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
* further leasing should be done for the event.
*/
-public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+@Slf4j
+public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
/** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
@FunctionalInterface
protected interface CheckedFunction<T, R> {
R apply(T t) throws IOException, SQLException;
}
- public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+ public static final String CONFIG_PREFIX = "MysqlMultiActiveLeaseArbiter";
protected final DataSource dataSource;
private final String leaseArbiterTableName;
private final String constantsTableName;
private final int epsilon;
private final int linger;
+
+ // TODO: define retention on this table
+ private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %S ("
+ + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ")
NOT NULL, flow_name varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ + "event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
Review Comment:
suggest no default since probably shouldn't be possible to insert w/o a
specific `event_timestamp`
(the default for `lease_acq_tstamp` seems reasonable)
Issue Time Tracking
-------------------
Worklog Id: (was: 865141)
Time Spent: 13h 20m (was: 13h 10m)
> Implement multi-active, non blocking for leader host
> ----------------------------------------------------
>
> Key: GOBBLIN-1837
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1837
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 13h 20m
> Remaining Estimate: 0h
>
> This task will include the implementation of non-blocking, multi-active
> scheduler for each host. It will NOT include metric emission or unit tests
> for validation. That will be done in a separate follow-up ticket. The work in
> this ticket includes
> * define a table to do scheduler lease determination for each flow's trigger
> event and related methods to execute actions on this tableĀ
> * update DagActionStore schema and DagActionStoreMonitor to act upon new
> "LAUNCH" type events in addition to KILL/RESUME
> * update scheduler/orchestrator logic to apply the non-blocking algorithm
> when "multi-active scheduler mode" is enabled, otherwise submit events
> directly to the DagManager after receiving a scheduler trigger
> * implement the non-blocking algorithm, particularly handling reminder
> events if another host is in the process of securing the lease for a
> particular flow trigger
--
This message was sent by Atlassian Jira
(v8.20.10#820010)