phet commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1227693255
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -96,19 +96,19 @@ public class ConfigurationKeys {
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS =
"skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
// Scheduler lease determination store configuration
- public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
"multi.active.scheduler.constants.db.table";
- public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= "multi.active.scheduler.";
- public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= "scheduler.lease.determination.store.db.table";
- public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE =
"gobblin_scheduler_lease_determination_store";
- public static final String SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY =
"reminderEventTimestampMillis";
- public static final String SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY =
"newEventTimestampMillis";
- public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = "";
+ public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
"MysqlMultiActiveLeaseArbiter.constantsTable";
+ public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= "MysqlMultiActiveLeaseArbiter.gobblin_multi_active_scheduler_constants_store";
+ public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= "MysqlMultiActiveLeaseArbiter.schedulerLeaseArbiterTable";
Review Comment:
no biggie, but for the lot of these, I suggest separately defining
`MYSQL_LEASE_ARBITER_PREFIX` and then prepending that to each of the many keys
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -96,19 +96,19 @@ public class ConfigurationKeys {
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS =
"skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
// Scheduler lease determination store configuration
- public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
"multi.active.scheduler.constants.db.table";
- public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= "multi.active.scheduler.";
- public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= "scheduler.lease.determination.store.db.table";
- public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE =
"gobblin_scheduler_lease_determination_store";
- public static final String SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY =
"reminderEventTimestampMillis";
- public static final String SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY =
"newEventTimestampMillis";
- public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = "";
+ public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
"MysqlMultiActiveLeaseArbiter.constantsTable";
+ public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= "MysqlMultiActiveLeaseArbiter.gobblin_multi_active_scheduler_constants_store";
+ public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= "MysqlMultiActiveLeaseArbiter.schedulerLeaseArbiterTable";
+ public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE =
"MysqlMultiActiveLeaseArbiter.gobblin_scheduler_lease_determination_store";
+ public static final String SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY =
"eventToRevisitTimestampMillis";
+ public static final String SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY =
"triggerEventTimestampMillis";
+ public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY =
"MysqlMultiActiveLeaseArbiter.epsilonMillis";
public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 100;
Review Comment:
of course we'll tune this as we proceed.... still, on considering potential
causes of late triggers, such as full GC pause, I'd imagine the need for a
larger value. I'd probably start w/ 5s
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -19,38 +19,38 @@
import java.io.IOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Data;
+
/**
* This interface defines a generic approach to a non-blocking, multiple
active thread or host system, in which one or
- * more active instances compete over ownership of a particular flow's event.
The type of flow event in question does
- * not impact the algorithm other than to uniquely identify the flow event.
Each instance uses the interface to initiate
- * an attempt at ownership over the flow event and receives a response
indicating the status of the attempt.
+ * more active participants compete to take responsiblity for a particular
flow's event. The type of flow event in
+ * question does not impact the algorithm other than to uniquely identify the
flow event. Each participant uses the
+ * interface to initiate an attempt at ownership over the flow event and
receives a response indicating the status of
+ * the attempt.
*
* At a high level the lease arbiter works as follows:
- * 1. Multiple instances receive knowledge of a flow action event to act upon
- * 2. Each instance attempts to acquire rights or `a lease` to be the sole
instance acting on the event by calling the
- * tryAcquireLease method below and receives the resulting status. The
status indicates whether this instance has
- * a) acquired the lease -> then this instance will attempt to complete
the lease
- * b) another has acquired the lease -> then another will attempt to
complete the lease
- * c) flow event no longer needs to be acted upon -> terminal state
- * 3. If another has acquired the lease, then the instance will check back in
at the time of lease expiry to see if it
- * needs to attempt the lease again [status (b) above].
- * 4. Once the instance which acquired the lease completes its work on the
flow event, it calls completeLeaseUse to
- * indicate to all other instances that the flow event no longer needs to
be acted upon [status (c) above]
+ * 1. Multiple participants independently learn of a flow action event to act
upon
+ * 2. Each participant attempts to acquire rights or `a lease` to be the sole
participant acting on the event by
+ * calling the tryAcquireLease method below and receives the resulting
status. The status indicates whether this
+ * participant has
+ * a) LeaseObtainedStatus -> this participant will attempt to carry out
the required action before the lease expires
+ * b) LeasedToAnotherStatus -> another will attempt to carry out the
required action before the lease expires
+ * c) NoLongerLeasingStatus -> flow event no longer needs to be acted
upon or terminal state
Review Comment:
nit: "...acted upon (terminal state)"
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -62,7 +62,38 @@ public interface MultiActiveLeaseArbiter {
* leaseAcquisitionTimeMillis values have not changed since this owner
acquired the lease (indicating the lease did
* not expire).
* @return true if successfully updated, indicating no further actions need
to be taken regarding this event.
+ * false if failed to update the lease properly, the caller should
continue seeking to acquire the lease as
+ * if any actions it did successfully accomplish, do not count
*/
- boolean completeLeaseUse(DagActionStore.DagAction flowAction, long
eventTimeMillis, long leaseAcquisitionTimeMillis)
- throws IOException;
+ boolean recordLeaseSuccess(DagActionStore.DagAction flowAction,
LeaseObtainedStatus status) throws IOException;
+
+ /*
+ Object used to encapsulate status of lease acquisition attempt and derived
should contain information specific to
Review Comment:
nits:
this (and all those below) should really be class javadoc to show up in that
tool
a class (not an object)
"derived [classes/types]" OR "derivations"
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -62,7 +62,38 @@ public interface MultiActiveLeaseArbiter {
* leaseAcquisitionTimeMillis values have not changed since this owner
acquired the lease (indicating the lease did
* not expire).
* @return true if successfully updated, indicating no further actions need
to be taken regarding this event.
+ * false if failed to update the lease properly, the caller should
continue seeking to acquire the lease as
+ * if any actions it did successfully accomplish, do not count
*/
- boolean completeLeaseUse(DagActionStore.DagAction flowAction, long
eventTimeMillis, long leaseAcquisitionTimeMillis)
- throws IOException;
+ boolean recordLeaseSuccess(DagActionStore.DagAction flowAction,
LeaseObtainedStatus status) throws IOException;
Review Comment:
nit: to streamline this interface, I'd personally tunnel the `flowAction` as
a package-protected member of `LeaseObtainedStatus`. thus, the core
identification of what to record can be derived from that single param.
in a way this is also more robust, since one `status` could never be
mismatched w/ an unrelated `flowAction`.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java:
##########
@@ -35,17 +36,12 @@ enum FlowActionType {
}
@Data
+ @AllArgsConstructor
class DagAction {
String flowGroup;
String flowName;
String flowExecutionId;
FlowActionType flowActionType;
Review Comment:
first off, this is beautiful... nearly looks like we're writing scala! ;p
secondly, in what situations are we expecting the need to mutate the fields
(i.e. why can't they be `final`?)? in general `@Data` / POJOs work quite well
being completely immutable. (which is why such classes may not require
`@AllArgsConstructor` in addition to `@Data`.)
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -62,7 +62,38 @@ public interface MultiActiveLeaseArbiter {
* leaseAcquisitionTimeMillis values have not changed since this owner
acquired the lease (indicating the lease did
* not expire).
* @return true if successfully updated, indicating no further actions need
to be taken regarding this event.
+ * false if failed to update the lease properly, the caller should
continue seeking to acquire the lease as
+ * if any actions it did successfully accomplish, do not count
*/
- boolean completeLeaseUse(DagActionStore.DagAction flowAction, long
eventTimeMillis, long leaseAcquisitionTimeMillis)
- throws IOException;
+ boolean recordLeaseSuccess(DagActionStore.DagAction flowAction,
LeaseObtainedStatus status) throws IOException;
+
+ /*
+ Object used to encapsulate status of lease acquisition attempt and derived
should contain information specific to
+ the status that results.
+ */
+ abstract class LeaseAttemptStatus {}
+
+ class NoLongerLeasingStatus extends LeaseAttemptStatus {}
+
+ /*
+ The participant calling this method acquired the lease for the event in
question. The class contains the `eventTimestamp`
+ associated with the lease as well as the time the caller obtained the lease
or `leaseAcquisitionTimestamp`.
+ */
+ @Data
+ class LeaseObtainedStatus extends LeaseAttemptStatus {
+ private final long eventTimestamp;
+ private final long leaseAcquisitionTimestamp;
+ }
+
+ /*
+ This flow action event already has a valid lease owned by another host.
+ */
+ @Data
+ class LeasedToAnotherStatus extends LeaseAttemptStatus {
+ // the timestamp the lease is associated with, but it may be a different
timestamp for the same flow action
+ // (a previous participant of the event)
+ private final long eventTimeMillis;
+ // the minimum amount of time to wait before returning to check if the
lease has completed or expired
+ private final long minimumLingerDurationMillis;
Review Comment:
better to describe these in the class doc to cause them to show up in
javadoc HTML and IDE help, etc.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -30,62 +29,60 @@
import org.quartz.JobKey;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.typesafe.config.Config;
import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
-import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
/**
* Handler used to coordinate multiple hosts with enabled schedulers to
respond to flow action events. It uses the
- * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to
determine a single lease owner at a given time
+ * {@link MysqlMultiActiveLeaseArbiter} to determine a single lease owner at a
given time
* for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
* to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
* the lease as completed by calling the
- * {@link
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse}
method. Hosts that do not gain
+ * MysqlMultiActiveLeaseArbiter.recordLeaseSuccess method. Hosts that do not
gain
* the lease for the event, instead schedule a reminder using the {@link
SchedulerService} to check back in on the
* previous lease owner's completion status after the lease should expire to
ensure the event is handled in failure
* cases.
*/
-public class SchedulerLeaseAlgoHandler {
- private static final Logger LOG =
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
- private final int staggerUpperBoundSec;
+@Slf4j
+public class FlowTriggerHandler {
+ private final int schedulerMaxBackoffMillis;
private static Random random = new Random();
protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
protected JobScheduler jobScheduler;
protected SchedulerService schedulerService;
protected DagActionStore dagActionStore;
private MetricContext metricContext;
private ContextAwareMeter numLeasesCompleted;
+
@Inject
- public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter
leaseDeterminationStore,
+ // TODO: should multiActiveLeaseArbiter and DagActionStore be optional?
Review Comment:
I wouldn't think so... but do elaborate on your thought process
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -30,62 +29,60 @@
import org.quartz.JobKey;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.typesafe.config.Config;
import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
-import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
/**
* Handler used to coordinate multiple hosts with enabled schedulers to
respond to flow action events. It uses the
- * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to
determine a single lease owner at a given time
+ * {@link MysqlMultiActiveLeaseArbiter} to determine a single lease owner at a
given time
* for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
* to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
* the lease as completed by calling the
- * {@link
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse}
method. Hosts that do not gain
+ * MysqlMultiActiveLeaseArbiter.recordLeaseSuccess method. Hosts that do not
gain
* the lease for the event, instead schedule a reminder using the {@link
SchedulerService} to check back in on the
* previous lease owner's completion status after the lease should expire to
ensure the event is handled in failure
* cases.
*/
-public class SchedulerLeaseAlgoHandler {
- private static final Logger LOG =
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
- private final int staggerUpperBoundSec;
+@Slf4j
+public class FlowTriggerHandler {
+ private final int schedulerMaxBackoffMillis;
private static Random random = new Random();
protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
protected JobScheduler jobScheduler;
protected SchedulerService schedulerService;
protected DagActionStore dagActionStore;
private MetricContext metricContext;
private ContextAwareMeter numLeasesCompleted;
+
@Inject
- public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter
leaseDeterminationStore,
+ // TODO: should multiActiveLeaseArbiter and DagActionStore be optional?
+ public FlowTriggerHandler(Config config, MultiActiveLeaseArbiter
leaseDeterminationStore,
JobScheduler jobScheduler, SchedulerService schedulerService,
DagActionStore dagActionStore) {
- this.staggerUpperBoundSec = ConfigUtils.getInt(config,
- ConfigurationKeys.SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY,
- ConfigurationKeys.DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC);
+ this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
this.multiActiveLeaseArbiter = leaseDeterminationStore;
this.jobScheduler = jobScheduler;
this.schedulerService = schedulerService;
this.dagActionStore = dagActionStore;
this.metricContext = Instrumented.getMetricContext(new
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
this.getClass());
- this.numLeasesCompleted =
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_LEASES_COMPLETED);
+ this.numLeasesCompleted =
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_FLOWS_SUBMITTED);
Review Comment:
looks like the constant needs renaming to
`GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_SUBMITTED`
(shall we update `numLeasesCompleted` as well?)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -30,62 +29,60 @@
import org.quartz.JobKey;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.typesafe.config.Config;
import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
-import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
/**
* Handler used to coordinate multiple hosts with enabled schedulers to
respond to flow action events. It uses the
- * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to
determine a single lease owner at a given time
+ * {@link MysqlMultiActiveLeaseArbiter} to determine a single lease owner at a
given time
* for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
* to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
* the lease as completed by calling the
- * {@link
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse}
method. Hosts that do not gain
+ * MysqlMultiActiveLeaseArbiter.recordLeaseSuccess method. Hosts that do not
gain
Review Comment:
I prefer the javadoc `@link` you had previously
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config,
MultiActiveLeaseArbiter leaseDet
* @param eventTimeMillis
* @throws IOException
*/
- public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- LeaseAttemptStatus leaseAttemptStatus =
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
- switch (leaseAttemptStatus.getClass().getSimpleName()) {
- case "LeaseObtainedStatus":
- finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
- break;
- case "LeasedToAnotherStatus":
- scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
- break;
- case "NoLongerLeasingStatus":
- break;
- default:
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus, flowAction);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+ eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ return;
}
+ log.warn("Received type of leaseAttemptStatus: {} not handled by this
method", leaseAttemptStatus.getClass().getName());
Review Comment:
since this represents a gaping hole in our impl, it's actually more
appropriate to scream/panic/freak out w/ a `RuntimeException`, than it is to
presume to continue (as if completely shirking all responsibility for the
`flowAction`)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config,
MultiActiveLeaseArbiter leaseDet
* @param eventTimeMillis
* @throws IOException
*/
- public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- LeaseAttemptStatus leaseAttemptStatus =
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
- switch (leaseAttemptStatus.getClass().getSimpleName()) {
- case "LeaseObtainedStatus":
- finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
- break;
- case "LeasedToAnotherStatus":
- scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
- break;
- case "NoLongerLeasingStatus":
- break;
- default:
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus, flowAction);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+ eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ return;
}
+ log.warn("Received type of leaseAttemptStatus: {} not handled by this
method", leaseAttemptStatus.getClass().getName());
}
// Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
- private boolean finalizeLease(LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
+ private boolean
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
try {
this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
- if (this.dagActionStore.exists(flowAction.getFlowGroup(),
flowAction.getFlowName(),
- flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
- // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
- this.numLeasesCompleted.mark();
- return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction,
status.getEventTimestamp(),
- status.getMyLeaseAcquisitionTimestamp());
- }
- } catch (IOException | SQLException e) {
+ // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
+ this.numLeasesCompleted.mark();
+ return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction,
status);
+ } catch (IOException e) {
throw new RuntimeException(e);
}
- // TODO: should this return an error or print a warning log if failed to
commit to dag action store?
- return false;
}
/**
- * This method is used by {@link
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for
itself
- * to check on the other participant's progress to finish acting on a flow
action after the time the lease should
- * expire.
+ * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to
schedule a reminder for itself to check on
+ * the other participant's progress to finish acting on a flow action after
the time the lease should expire.
* @param jobProps
* @param status used to extract event to be reminded for and the minimum
time after which reminder should occur
* @param originalEventTimeMillis the event timestamp we were originally
handling
* @param flowAction
*/
- private void scheduleReminderForEvent(Properties jobProps,
LeasedToAnotherStatus status,
+ private void scheduleReminderForEvent(Properties jobProps,
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
// Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
- String cronExpression =
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() +
random.nextInt(staggerUpperBoundSec));
+ String cronExpression =
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+ + random.nextInt(schedulerMaxBackoffMillis));
jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
- // Ensure we save the event timestamp that we're setting reminder for, in
addition to our own event timestamp which may be different
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
+ // Ensure we save the event timestamp that we're setting reminder for to
have for debugging purposes
+ // in addition to the event we want to initiate
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(status.getEventTimeMillis()));
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(originalEventTimeMillis));
JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
- Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+ // Create a new trigger for the flow in job scheduler that is set to fire
at the minimum reminder wait time calculated
+ Trigger trigger = this.jobScheduler.createTriggerForJob(key, jobProps);
try {
- LOG.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -
attempting to schedule reminder for event %s in %s millis",
- flowAction, originalEventTimeMillis,
status.getReminderEventTimeMillis(), trigger.getNextFireTime());
+ log.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -
attempting to schedule reminder for event %s in %s millis",
Review Comment:
update to "Flow Trigger Handler -'
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config,
MultiActiveLeaseArbiter leaseDet
* @param eventTimeMillis
* @throws IOException
*/
- public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- LeaseAttemptStatus leaseAttemptStatus =
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
- switch (leaseAttemptStatus.getClass().getSimpleName()) {
- case "LeaseObtainedStatus":
- finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
- break;
- case "LeasedToAnotherStatus":
- scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
- break;
- case "NoLongerLeasingStatus":
- break;
- default:
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus, flowAction);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+ eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ return;
}
+ log.warn("Received type of leaseAttemptStatus: {} not handled by this
method", leaseAttemptStatus.getClass().getName());
}
// Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
- private boolean finalizeLease(LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
+ private boolean
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
try {
this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
- if (this.dagActionStore.exists(flowAction.getFlowGroup(),
flowAction.getFlowName(),
- flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
- // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
- this.numLeasesCompleted.mark();
- return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction,
status.getEventTimestamp(),
- status.getMyLeaseAcquisitionTimestamp());
- }
- } catch (IOException | SQLException e) {
+ // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
+ this.numLeasesCompleted.mark();
+ return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction,
status);
+ } catch (IOException e) {
throw new RuntimeException(e);
}
- // TODO: should this return an error or print a warning log if failed to
commit to dag action store?
- return false;
}
/**
- * This method is used by {@link
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for
itself
- * to check on the other participant's progress to finish acting on a flow
action after the time the lease should
- * expire.
+ * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to
schedule a reminder for itself to check on
+ * the other participant's progress to finish acting on a flow action after
the time the lease should expire.
* @param jobProps
* @param status used to extract event to be reminded for and the minimum
time after which reminder should occur
* @param originalEventTimeMillis the event timestamp we were originally
handling
* @param flowAction
*/
- private void scheduleReminderForEvent(Properties jobProps,
LeasedToAnotherStatus status,
+ private void scheduleReminderForEvent(Properties jobProps,
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
// Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
- String cronExpression =
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() +
random.nextInt(staggerUpperBoundSec));
+ String cronExpression =
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+ + random.nextInt(schedulerMaxBackoffMillis));
jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
- // Ensure we save the event timestamp that we're setting reminder for, in
addition to our own event timestamp which may be different
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
+ // Ensure we save the event timestamp that we're setting reminder for to
have for debugging purposes
+ // in addition to the event we want to initiate
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(status.getEventTimeMillis()));
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(originalEventTimeMillis));
JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
- Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+ // Create a new trigger for the flow in job scheduler that is set to fire
at the minimum reminder wait time calculated
+ Trigger trigger = this.jobScheduler.createTriggerForJob(key, jobProps);
try {
- LOG.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -
attempting to schedule reminder for event %s in %s millis",
- flowAction, originalEventTimeMillis,
status.getReminderEventTimeMillis(), trigger.getNextFireTime());
+ log.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -
attempting to schedule reminder for event %s in %s millis",
+ flowAction, originalEventTimeMillis, status.getEventTimeMillis(),
trigger.getNextFireTime());
this.schedulerService.getScheduler().scheduleJob(trigger);
} catch (SchedulerException e) {
- LOG.warn("Failed to add job reminder due to SchedulerException for job
%s trigger event %s ", key, status.getReminderEventTimeMillis(), e);
+ log.warn("Failed to add job reminder due to SchedulerException for job
%s trigger event %s ", key, status.getEventTimeMillis(), e);
}
- LOG.info(String.format("Scheduler Lease Algo Handler - [%s,
eventTimestamp: %s] - SCHEDULED REMINDER for event %s in %s millis",
- flowAction, originalEventTimeMillis,
status.getReminderEventTimeMillis(), trigger.getNextFireTime()));
+ log.info(String.format("Scheduler Lease Algo Handler - [%s,
eventTimestamp: %s] - SCHEDULED REMINDER for event %s in %s millis",
Review Comment:
(also here)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config,
MultiActiveLeaseArbiter leaseDet
* @param eventTimeMillis
* @throws IOException
*/
- public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- LeaseAttemptStatus leaseAttemptStatus =
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
- switch (leaseAttemptStatus.getClass().getSimpleName()) {
- case "LeaseObtainedStatus":
- finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
- break;
- case "LeasedToAnotherStatus":
- scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
- break;
- case "NoLongerLeasingStatus":
- break;
- default:
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus, flowAction);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+ eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ return;
}
+ log.warn("Received type of leaseAttemptStatus: {} not handled by this
method", leaseAttemptStatus.getClass().getName());
}
// Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
- private boolean finalizeLease(LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
+ private boolean
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
try {
this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
- if (this.dagActionStore.exists(flowAction.getFlowGroup(),
flowAction.getFlowName(),
- flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
- // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
- this.numLeasesCompleted.mark();
- return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction,
status.getEventTimestamp(),
- status.getMyLeaseAcquisitionTimestamp());
- }
- } catch (IOException | SQLException e) {
+ // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
+ this.numLeasesCompleted.mark();
+ return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction,
status);
+ } catch (IOException e) {
throw new RuntimeException(e);
}
- // TODO: should this return an error or print a warning log if failed to
commit to dag action store?
- return false;
}
/**
- * This method is used by {@link
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for
itself
- * to check on the other participant's progress to finish acting on a flow
action after the time the lease should
- * expire.
+ * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to
schedule a reminder for itself to check on
+ * the other participant's progress to finish acting on a flow action after
the time the lease should expire.
* @param jobProps
* @param status used to extract event to be reminded for and the minimum
time after which reminder should occur
* @param originalEventTimeMillis the event timestamp we were originally
handling
* @param flowAction
*/
- private void scheduleReminderForEvent(Properties jobProps,
LeasedToAnotherStatus status,
+ private void scheduleReminderForEvent(Properties jobProps,
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
Review Comment:
it may be necessary to overload this w/ a form taking
`LeaseObtainedStatus`...
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -55,31 +55,27 @@ public void
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E
try {
// If an existing resume request is still pending then do not accept
this request
if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
- this.handleException(flowGroup, flowName, flowExecutionId.toString(),
DagActionStore.FlowActionType.RESUME,
- new RuntimeException("There is already a pending RESUME action for
this flow. Please wait to resubmit and wait for"
- + " action to be completed."));
+ this.handleException("There is already a pending RESUME action for
this flow. Please wait to resubmit and wait "
Review Comment:
nit: `handleError` / `prepareError` (?)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -166,8 +165,8 @@ public
GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
Config config,
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
Optional<TopologyCatalog> topologyCatalog,
Orchestrator orchestrator, SchedulerService schedulerService,
Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
- @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled,
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean
multiActiveSchedulerEnabled,
- SchedulerLeaseAlgoHandler schedulerLeaseAlgoHandler) throws Exception {
+ @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled,
+ Optional<FlowTriggerHandler> schedulerLeaseAlgoHandler) throws Exception
{
Review Comment:
rename `schedulerLeaseAlgoHandler`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -55,31 +55,27 @@ public void
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E
try {
// If an existing resume request is still pending then do not accept
this request
if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
- this.handleException(flowGroup, flowName, flowExecutionId.toString(),
DagActionStore.FlowActionType.RESUME,
- new RuntimeException("There is already a pending RESUME action for
this flow. Please wait to resubmit and wait for"
- + " action to be completed."));
+ this.handleException("There is already a pending RESUME action for
this flow. Please wait to resubmit and wait "
+ + "for action to be completed.", HttpStatus.S_409_CONFLICT);
return;
}
this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME);
} catch (IOException | SQLException e) {
log.warn(
String.format("Failed to add execution resume action for flow %s %s
%s to dag action store due to", flowGroup,
flowName, flowExecutionId), e);
- this.handleException(flowGroup, flowName, flowExecutionId.toString(),
DagActionStore.FlowActionType.RESUME, e);
+ this.handleException(e.getMessage(),
HttpStatus.S_500_INTERNAL_SERVER_ERROR);
}
}
- private void handleException (String flowGroup, String flowName, String
flowExecutionId, DagActionStore.FlowActionType flowActionType, Exception e) {
- try {
- if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId,
flowActionType)) {
- throw new RestLiServiceException(HttpStatus.S_409_CONFLICT,
e.getMessage());
- } else {
- throw new
RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
- }
- } catch (IOException | SQLException ex) {
- throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR,
e.getMessage());
+ private void handleException(String exceptionMessage, HttpStatus errorType) {
+ if (errorType == HttpStatus.S_409_CONFLICT) {
+ throw new RestLiServiceException(HttpStatus.S_409_CONFLICT,
exceptionMessage);
+ } else if (errorType == HttpStatus.S_400_BAD_REQUEST) {
+ new UpdateResponse(HttpStatus.S_400_BAD_REQUEST);
Review Comment:
the other two throw... should this not?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -66,24 +67,37 @@
* host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
* further leasing should be done for the event.
*/
-public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+@Slf4j
+public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
/** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
@FunctionalInterface
protected interface CheckedFunction<T, R> {
R apply(T t) throws IOException, SQLException;
}
- public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+ public static final String CONFIG_PREFIX = "MysqlMultiActiveLeaseArbiter";
Review Comment:
better to define the prefix in `ConfigurationKeys`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -199,23 +197,27 @@ public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagAction flowAction, l
int leaseValidityStatus = resultSet.getInt(4);
Review Comment:
hopefully doesn't feel like overkill, but I'd abstract this by defining a
`static` inner `@Data` class with an overloaded constructor (or `static`
factory method) taking a `ResultSet`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -100,8 +114,8 @@ public class MySQLMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
// Insert or update row to acquire lease if values have not changed since
the previous read
// Need to define three separate statements to handle cases where row does
not exist or has null values to check
protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s "
- + "(flow_group, flow_name, flow_execution_id, flow_action,
event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT "
- + "EXISTS (SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_KEY + "); " +
SELECT_AFTER_INSERT_STATEMENT;
+ + "(flow_group, flow_name, flow_execution_id, flow_action,
event_timestamp) VALUES (?, ?, ?, ?, ?); "
+ + SELECT_AFTER_INSERT_STATEMENT;
Review Comment:
just thinking... it may be clearer not to append this to the constant, but
rather to catenate them at point of use
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -579,11 +579,10 @@ public void run() {
}
}
- private void clearUpDagAction(DagId dagId) throws IOException {
+ private void clearUpDagAction(DagId dagId, DagActionStore.FlowActionType
flowActionType) throws IOException {
Review Comment:
nit: "clear up" seems indirect and w/ a wide variety of connotations (from
misunderstandings and weather... to skin care).
what are we doing here? "resolving" the action? "dismissing"? "deleting"?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java:
##########
@@ -581,7 +581,7 @@ public void close() throws IOException {
/**
* Get a {@link org.quartz.Trigger} from the given job configuration
properties.
*/
- public Trigger getTrigger(JobKey jobKey, Properties jobProps) {
+ public Trigger createTriggerForJob(JobKey jobKey, Properties jobProps) {
Review Comment:
couldn't this be `static`? if so, let's!
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -579,11 +579,10 @@ public void run() {
}
}
- private void clearUpDagAction(DagId dagId) throws IOException {
+ private void clearUpDagAction(DagId dagId, DagActionStore.FlowActionType
flowActionType) throws IOException {
if (this.dagActionStore.isPresent()) {
this.dagActionStore.get().deleteDagAction(
- new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName,
dagId.flowExecutionId,
- DagActionStore.FlowActionType.KILL));
+ new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName,
dagId.flowExecutionId, flowActionType));
Review Comment:
minor, but depending on how prevalent this is, it might be reasonable to
supply an additional constructor:
```
public DagAction(DagId dagId, FlowActionType flowActionType) { ... }
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -106,22 +104,20 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
private FlowStatusGenerator flowStatusGenerator;
private UserQuotaManager quotaManager;
- private boolean isMultiActiveSchedulerEnabled;
- private SchedulerLeaseAlgoHandler schedulerLeaseAlgoHandler;
+ private Optional<FlowTriggerHandler> schedulerLeaseAlgoHandler;
Review Comment:
rename `schedulerLeaseAlgoHandler`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -317,26 +312,27 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
}
// If multi-active scheduler is enabled do not pass onto DagManager,
otherwise scheduler forwards it directly
- if (this.isMultiActiveSchedulerEnabled) {
+ if (schedulerLeaseAlgoHandler.isPresent()) {
+ // If triggerTimestampMillis is 0, then it was not set by the job
trigger handler, and we cannot handle this event
+ if (triggerTimestampMillis == 0L) {
+ _log.warn("Skipping execution of spec: {} because missing trigger
timestamp in job properties",
+ jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow orchestration
skipped because no trigger timestamp "
+ + "associated with flow action.");
+ if (this.eventSubmitter.isPresent()) {
+ new TimingEvent(this.eventSubmitter.get(),
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
Review Comment:
tip: `ifPresent()`
(note: this works easily and naturally... unless checked exceptions)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config,
MultiActiveLeaseArbiter leaseDet
* @param eventTimeMillis
* @throws IOException
*/
- public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- LeaseAttemptStatus leaseAttemptStatus =
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
- switch (leaseAttemptStatus.getClass().getSimpleName()) {
- case "LeaseObtainedStatus":
- finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
- break;
- case "LeasedToAnotherStatus":
- scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
- break;
- case "NoLongerLeasingStatus":
- break;
- default:
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus, flowAction);
Review Comment:
shouldn't we confirm the return value to determine whether to schedule a
reminder?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -378,6 +374,26 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
}
+ public void submitFlowToDagManager(FlowSpec flowSpec,
Optional<Dag<JobExecutionPlan>> jobExecutionPlanDag)
+ throws IOException {
+ if (!jobExecutionPlanDag.isPresent()) {
+ jobExecutionPlanDag = Optional.of(specCompiler.compileFlow(flowSpec));
+ }
Review Comment:
while `Optional` is generally great, it's only needed when those calling are
unaware of whether or not they are holding on to a `Dag<>` or not. when it is
statically known by the caller, instead clearer is to overload
`submitFlowToDagManager` to both a unary and binary form.
here the unary form merely compiles the DAG before forwarding/delegating to
the binary one:
```
public void submitFlowToDagManager(FlowSpec flowSpec) throws IOE {
submitFlowToDagManager(flowSpec, specCompiler.compileFlow(flowSpec));
}
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config,
MultiActiveLeaseArbiter leaseDet
* @param eventTimeMillis
* @throws IOException
*/
- public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- LeaseAttemptStatus leaseAttemptStatus =
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
- switch (leaseAttemptStatus.getClass().getSimpleName()) {
- case "LeaseObtainedStatus":
- finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
- break;
- case "LeasedToAnotherStatus":
- scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
- break;
- case "NoLongerLeasingStatus":
- break;
- default:
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus, flowAction);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+ eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ return;
}
+ log.warn("Received type of leaseAttemptStatus: {} not handled by this
method", leaseAttemptStatus.getClass().getName());
}
// Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
- private boolean finalizeLease(LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
+ private boolean
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
try {
this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
- if (this.dagActionStore.exists(flowAction.getFlowGroup(),
flowAction.getFlowName(),
- flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
- // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
- this.numLeasesCompleted.mark();
- return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction,
status.getEventTimestamp(),
- status.getMyLeaseAcquisitionTimestamp());
- }
- } catch (IOException | SQLException e) {
+ // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
+ this.numLeasesCompleted.mark();
+ return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction,
status);
+ } catch (IOException e) {
throw new RuntimeException(e);
}
- // TODO: should this return an error or print a warning log if failed to
commit to dag action store?
- return false;
}
/**
- * This method is used by {@link
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for
itself
- * to check on the other participant's progress to finish acting on a flow
action after the time the lease should
- * expire.
+ * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to
schedule a reminder for itself to check on
+ * the other participant's progress to finish acting on a flow action after
the time the lease should expire.
* @param jobProps
* @param status used to extract event to be reminded for and the minimum
time after which reminder should occur
* @param originalEventTimeMillis the event timestamp we were originally
handling
* @param flowAction
*/
- private void scheduleReminderForEvent(Properties jobProps,
LeasedToAnotherStatus status,
+ private void scheduleReminderForEvent(Properties jobProps,
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
// Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
- String cronExpression =
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() +
random.nextInt(staggerUpperBoundSec));
+ String cronExpression =
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+ + random.nextInt(schedulerMaxBackoffMillis));
jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
- // Ensure we save the event timestamp that we're setting reminder for, in
addition to our own event timestamp which may be different
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
+ // Ensure we save the event timestamp that we're setting reminder for to
have for debugging purposes
+ // in addition to the event we want to initiate
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(status.getEventTimeMillis()));
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(originalEventTimeMillis));
JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
- Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+ // Create a new trigger for the flow in job scheduler that is set to fire
at the minimum reminder wait time calculated
+ Trigger trigger = this.jobScheduler.createTriggerForJob(key, jobProps);
Review Comment:
would be ideal if this invocation could be `static`!
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config,
MultiActiveLeaseArbiter leaseDet
* @param eventTimeMillis
* @throws IOException
*/
- public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- LeaseAttemptStatus leaseAttemptStatus =
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
- switch (leaseAttemptStatus.getClass().getSimpleName()) {
- case "LeaseObtainedStatus":
- finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
- break;
- case "LeasedToAnotherStatus":
- scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
- break;
- case "NoLongerLeasingStatus":
- break;
- default:
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus, flowAction);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+ eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ return;
}
+ log.warn("Received type of leaseAttemptStatus: {} not handled by this
method", leaseAttemptStatus.getClass().getName());
}
// Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
- private boolean finalizeLease(LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
+ private boolean
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
try {
this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
- if (this.dagActionStore.exists(flowAction.getFlowGroup(),
flowAction.getFlowName(),
- flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
- // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
- this.numLeasesCompleted.mark();
- return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction,
status.getEventTimestamp(),
- status.getMyLeaseAcquisitionTimestamp());
- }
- } catch (IOException | SQLException e) {
+ // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
+ this.numLeasesCompleted.mark();
+ return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction,
status);
+ } catch (IOException e) {
throw new RuntimeException(e);
}
- // TODO: should this return an error or print a warning log if failed to
commit to dag action store?
- return false;
}
/**
- * This method is used by {@link
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for
itself
- * to check on the other participant's progress to finish acting on a flow
action after the time the lease should
- * expire.
+ * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to
schedule a reminder for itself to check on
Review Comment:
a. I like the `@link`
b. "a reminder for itself" => "a self-reminder"
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config,
MultiActiveLeaseArbiter leaseDet
* @param eventTimeMillis
* @throws IOException
*/
- public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
- LeaseAttemptStatus leaseAttemptStatus =
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
// TODO: add a log event or metric for each of these cases
- switch (leaseAttemptStatus.getClass().getSimpleName()) {
- case "LeaseObtainedStatus":
- finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
- break;
- case "LeasedToAnotherStatus":
- scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
- break;
- case "NoLongerLeasingStatus":
- break;
- default:
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus)
leaseAttemptStatus, flowAction);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+ eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ return;
}
+ log.warn("Received type of leaseAttemptStatus: {} not handled by this
method", leaseAttemptStatus.getClass().getName());
}
// Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
- private boolean finalizeLease(LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
+ private boolean
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
try {
this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
- if (this.dagActionStore.exists(flowAction.getFlowGroup(),
flowAction.getFlowName(),
- flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
- // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
- this.numLeasesCompleted.mark();
- return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction,
status.getEventTimestamp(),
- status.getMyLeaseAcquisitionTimestamp());
- }
- } catch (IOException | SQLException e) {
+ // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
+ this.numLeasesCompleted.mark();
+ return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction,
status);
+ } catch (IOException e) {
throw new RuntimeException(e);
}
- // TODO: should this return an error or print a warning log if failed to
commit to dag action store?
- return false;
}
/**
- * This method is used by {@link
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for
itself
- * to check on the other participant's progress to finish acting on a flow
action after the time the lease should
- * expire.
+ * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to
schedule a reminder for itself to check on
+ * the other participant's progress to finish acting on a flow action after
the time the lease should expire.
* @param jobProps
* @param status used to extract event to be reminded for and the minimum
time after which reminder should occur
* @param originalEventTimeMillis the event timestamp we were originally
handling
* @param flowAction
*/
- private void scheduleReminderForEvent(Properties jobProps,
LeasedToAnotherStatus status,
+ private void scheduleReminderForEvent(Properties jobProps,
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
// Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
- String cronExpression =
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() +
random.nextInt(staggerUpperBoundSec));
+ String cronExpression =
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+ + random.nextInt(schedulerMaxBackoffMillis));
jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
- // Ensure we save the event timestamp that we're setting reminder for, in
addition to our own event timestamp which may be different
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
+ // Ensure we save the event timestamp that we're setting reminder for to
have for debugging purposes
+ // in addition to the event we want to initiate
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(status.getEventTimeMillis()));
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(originalEventTimeMillis));
JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
- Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+ // Create a new trigger for the flow in job scheduler that is set to fire
at the minimum reminder wait time calculated
+ Trigger trigger = this.jobScheduler.createTriggerForJob(key, jobProps);
try {
- LOG.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -
attempting to schedule reminder for event %s in %s millis",
- flowAction, originalEventTimeMillis,
status.getReminderEventTimeMillis(), trigger.getNextFireTime());
+ log.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -
attempting to schedule reminder for event %s in %s millis",
+ flowAction, originalEventTimeMillis, status.getEventTimeMillis(),
trigger.getNextFireTime());
this.schedulerService.getScheduler().scheduleJob(trigger);
} catch (SchedulerException e) {
- LOG.warn("Failed to add job reminder due to SchedulerException for job
%s trigger event %s ", key, status.getReminderEventTimeMillis(), e);
+ log.warn("Failed to add job reminder due to SchedulerException for job
%s trigger event %s ", key, status.getEventTimeMillis(), e);
}
- LOG.info(String.format("Scheduler Lease Algo Handler - [%s,
eventTimestamp: %s] - SCHEDULED REMINDER for event %s in %s millis",
- flowAction, originalEventTimeMillis,
status.getReminderEventTimeMillis(), trigger.getNextFireTime()));
+ log.info(String.format("Scheduler Lease Algo Handler - [%s,
eventTimestamp: %s] - SCHEDULED REMINDER for event %s in %s millis",
+ flowAction, originalEventTimeMillis, status.getEventTimeMillis(),
trigger.getNextFireTime()));
}
/**
* These methods should only be called from the Orchestrator or JobScheduler
classes as it directly adds jobs to the
* Quartz scheduler
- * @param delayPeriodSeconds
+ * @param delayPeriodMillis
* @return
*/
- protected static String createCronFromDelayPeriod(long delayPeriodSeconds) {
+ protected static String createCronFromDelayPeriod(long delayPeriodMillis) {
LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
- LocalDateTime delaySecondsLater = now.plus(delayPeriodSeconds,
ChronoUnit.SECONDS);
+ LocalDateTime delaySecondsLater = now.plus(delayPeriodMillis,
ChronoUnit.MILLIS);
Review Comment:
nit: `delaySecondsLater` is neither seconds nor an interval, but an absolute
time in millis
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -165,9 +161,8 @@ public Orchestrator(Config config,
Optional<TopologyCatalog> topologyCatalog, Op
@Inject
public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator,
Optional<TopologyCatalog> topologyCatalog,
- Optional<DagManager> dagManager, Optional<Logger> log,
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean
multiActiveSchedulerEnabled,
- SchedulerLeaseAlgoHandler schedulerLeaseAlgoHandler) {
- this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true,
multiActiveSchedulerEnabled, schedulerLeaseAlgoHandler);
+ Optional<DagManager> dagManager, Optional<Logger> log,
Optional<FlowTriggerHandler> schedulerLeaseAlgoHandler) {
Review Comment:
need to rename `schedulerLeaseAlgoHandler`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -66,24 +67,37 @@
* host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
* further leasing should be done for the event.
*/
-public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+@Slf4j
+public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
/** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
@FunctionalInterface
protected interface CheckedFunction<T, R> {
R apply(T t) throws IOException, SQLException;
}
- public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+ public static final String CONFIG_PREFIX = "MysqlMultiActiveLeaseArbiter";
protected final DataSource dataSource;
private final String leaseArbiterTableName;
private final String constantsTableName;
private final int epsilon;
private final int linger;
+
+ // TODO: define retention on this table
+ private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %S ("
+ + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ")
NOT NULL, flow_name varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ + "event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
Review Comment:
suggest no default since probably shouldn't be possible to insert w/o a
specific `event_timestamp`
(the default for `lease_acq_tstamp` seems reasonable)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]