[
https://issues.apache.org/jira/browse/GOBBLIN-1840?focusedWorklogId=867597&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-867597
]
ASF GitHub Bot logged work on GOBBLIN-1840:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 26/Jun/23 22:20
Start Date: 26/Jun/23 22:20
Worklog Time Spent: 10m
Work Description: homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1242790138
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI,
jobUri);
this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+ GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+ new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics,
jobNameToNextSchedulableTime,
+ jobSchedulingThrottleTimeout, clock)
+ : new GobblinHelixJobLauncherListener(this.launcherMetrics);
if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
LOGGER.info("Scheduling job " + jobUri);
scheduleJob(jobProps,
- new GobblinHelixJobLauncherListener(this.launcherMetrics));
+ listener);
} else {
- LOGGER.info("No job schedule found, so running job " + jobUri);
+ LOGGER.info("No job schedule"
+ + " found, so running job " + jobUri);
Review Comment:
nit: Does not need to be on a new line
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI,
jobUri);
this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+ GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+ new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics,
jobNameToNextSchedulableTime,
+ jobSchedulingThrottleTimeout, clock)
+ : new GobblinHelixJobLauncherListener(this.launcherMetrics);
if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
LOGGER.info("Scheduling job " + jobUri);
scheduleJob(jobProps,
- new GobblinHelixJobLauncherListener(this.launcherMetrics));
+ listener);
} else {
- LOGGER.info("No job schedule found, so running job " + jobUri);
+ LOGGER.info("No job schedule"
+ + " found, so running job " + jobUri);
this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
- new
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+ listener));
}
+
} catch (JobException je) {
LOGGER.error("Failed to schedule or run job " + jobUri, je);
}
}
@Subscribe
- public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent
updateJobArrival) {
+ public synchronized void
handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
LOGGER.info("Received update for job configuration of job " +
updateJobArrival.getJobName());
+ String jobName = updateJobArrival.getJobName();
+
+ if (this.isThrottleEnabled &&
+ this.jobNameToNextSchedulableTime.getOrDefault(jobName,
Instant.ofEpochMilli(0)).isAfter(clock.instant())) {
Review Comment:
Nit: This line is a bit dense. And to indicate beginning of time, the
documentation for `Instant` has `Instant.MIN` or `Instant.EPOCH` which should
be more readable.
Also, intuitively it feels a little weird to read as "nextSchedulableTime is
after current time". I feel it's more intuitive for it to be
"current time is before nextSchedulableTime" i.e.
```
clock.instant().isBefore(jobNameToNextSchedulableTime.getOrDefault(jobName,
Instant.ofEpochMilli(0)))
```
or IMO even more readable
```
Instant nextSchedulableTime =
jobNameToNextSchedulableTime.getOrDefault(jobName, Instant.MIN);
if (this.isThrottleEnabled && clock.instant().isBefore(nextSchedulableTime))
{
...
```
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends
GobblinHelixJobLauncherListener {
+
+ public final static Logger LOG =
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+ private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+ private Duration helixJobSchedulingThrottleTimeout;
+ private Clock clock;
+
+ public
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics
jobLauncherMetrics,
+ ConcurrentHashMap jobNameToNextSchedulableTime, Duration
helixJobSchedulingThrottleTimeout, Clock clock) {
+ super(jobLauncherMetrics);
+ this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+ this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+ this.clock = clock;
+ }
+
+ @Override
+ public void onJobPrepare(JobContext jobContext)
+ throws Exception {
+ super.onJobPrepare(jobContext);
+ Instant nextSchedulableTime =
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+ jobNameToNextSchedulableTime.put(jobContext.getJobName(),
nextSchedulableTime);
+ LOG.info(jobContext.getJobName() + " finished prepare. The next
schedulable time is " + nextSchedulableTime );
+ }
+
+ @Override
+ public void onJobCompletion(JobContext jobContext)
+ throws Exception {
+ super.onJobCompletion(jobContext);
+ if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+ jobNameToNextSchedulableTime.put(jobContext.getJobName(),
Instant.ofEpochMilli(0));
+ } else {
+ Instant nextSchedulableTime =
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+ jobNameToNextSchedulableTime.put(jobContext.getJobName(),
nextSchedulableTime);
+ LOG.info(jobContext.getJobName() + " finished completion. The next
schedulable time is " + nextSchedulableTime );
+ }
+ }
+
+ @Override
+ public void onJobCancellation(JobContext jobContext)
+ throws Exception {
+ super.onJobCancellation(jobContext);
+ jobNameToNextSchedulableTime.put(jobContext.getJobName(),
Instant.ofEpochMilli(0));
Review Comment:
Same as for job failed. We'd want something similar.
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -70,10 +87,18 @@ public class GobblinHelixJobSchedulerTest {
private GobblinTaskRunner gobblinTaskRunner;
private Thread thread;
-
private final String workflowIdSuffix1 = "_1504201348471";
private final String workflowIdSuffix2 = "_1504201348472";
+ private Instant beginTime = Instant.ofEpochMilli(0);
+ private Instant withinThrottlePeriod = beginTime.plus(1, ChronoUnit.SECONDS);
Review Comment:
Nit: seems like these can be final
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI,
jobUri);
this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+ GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+ new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics,
jobNameToNextSchedulableTime,
+ jobSchedulingThrottleTimeout, clock)
+ : new GobblinHelixJobLauncherListener(this.launcherMetrics);
if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
LOGGER.info("Scheduling job " + jobUri);
scheduleJob(jobProps,
- new GobblinHelixJobLauncherListener(this.launcherMetrics));
+ listener);
} else {
- LOGGER.info("No job schedule found, so running job " + jobUri);
+ LOGGER.info("No job schedule"
+ + " found, so running job " + jobUri);
this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
- new
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+ listener));
Review Comment:
nit: Does not need to be on a new line
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends
GobblinHelixJobLauncherListener {
+
+ public final static Logger LOG =
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+ private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+ private Duration helixJobSchedulingThrottleTimeout;
+ private Clock clock;
+
+ public
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics
jobLauncherMetrics,
+ ConcurrentHashMap jobNameToNextSchedulableTime, Duration
helixJobSchedulingThrottleTimeout, Clock clock) {
+ super(jobLauncherMetrics);
+ this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+ this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+ this.clock = clock;
+ }
+
+ @Override
+ public void onJobPrepare(JobContext jobContext)
+ throws Exception {
+ super.onJobPrepare(jobContext);
+ Instant nextSchedulableTime =
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+ jobNameToNextSchedulableTime.put(jobContext.getJobName(),
nextSchedulableTime);
+ LOG.info(jobContext.getJobName() + " finished prepare. The next
schedulable time is " + nextSchedulableTime );
+ }
+
+ @Override
+ public void onJobCompletion(JobContext jobContext)
+ throws Exception {
+ super.onJobCompletion(jobContext);
+ if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+ jobNameToNextSchedulableTime.put(jobContext.getJobName(),
Instant.ofEpochMilli(0));
Review Comment:
Also, instead of ofEpochMilli(0), let's use `Instant.EPOCH`
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI,
jobUri);
this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+ GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+ new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics,
jobNameToNextSchedulableTime,
+ jobSchedulingThrottleTimeout, clock)
+ : new GobblinHelixJobLauncherListener(this.launcherMetrics);
if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
LOGGER.info("Scheduling job " + jobUri);
scheduleJob(jobProps,
- new GobblinHelixJobLauncherListener(this.launcherMetrics));
+ listener);
Review Comment:
nit: Does not need to be on a new line
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends
GobblinHelixJobLauncherListener {
+
+ public final static Logger LOG =
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+ private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+ private Duration helixJobSchedulingThrottleTimeout;
+ private Clock clock;
+
+ public
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics
jobLauncherMetrics,
+ ConcurrentHashMap jobNameToNextSchedulableTime, Duration
helixJobSchedulingThrottleTimeout, Clock clock) {
+ super(jobLauncherMetrics);
+ this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+ this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+ this.clock = clock;
+ }
+
+ @Override
+ public void onJobPrepare(JobContext jobContext)
+ throws Exception {
+ super.onJobPrepare(jobContext);
+ Instant nextSchedulableTime =
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+ jobNameToNextSchedulableTime.put(jobContext.getJobName(),
nextSchedulableTime);
+ LOG.info(jobContext.getJobName() + " finished prepare. The next
schedulable time is " + nextSchedulableTime );
+ }
+
+ @Override
+ public void onJobCompletion(JobContext jobContext)
+ throws Exception {
+ super.onJobCompletion(jobContext);
+ if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+ jobNameToNextSchedulableTime.put(jobContext.getJobName(),
Instant.ofEpochMilli(0));
+ } else {
+ Instant nextSchedulableTime =
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+ jobNameToNextSchedulableTime.put(jobContext.getJobName(),
nextSchedulableTime);
+ LOG.info(jobContext.getJobName() + " finished completion. The next
schedulable time is " + nextSchedulableTime );
Review Comment:
Maybe `is completed` instead of `finished completion`?
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -315,24 +344,39 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI,
jobUri);
this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
-
+ GobblinHelixJobLauncherListener listener = isThrottleEnabled ?
+ new GobblinThrottlingHelixJobLauncherListener(this.launcherMetrics,
jobNameToNextSchedulableTime,
+ jobSchedulingThrottleTimeout, clock)
+ : new GobblinHelixJobLauncherListener(this.launcherMetrics);
if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
LOGGER.info("Scheduling job " + jobUri);
scheduleJob(jobProps,
- new GobblinHelixJobLauncherListener(this.launcherMetrics));
+ listener);
} else {
- LOGGER.info("No job schedule found, so running job " + jobUri);
+ LOGGER.info("No job schedule"
+ + " found, so running job " + jobUri);
this.jobExecutor.execute(new NonScheduledJobRunner(jobProps,
- new
GobblinHelixJobLauncherListener(this.launcherMetrics)));
+ listener));
}
+
Review Comment:
nit: Does not need a new line
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -443,6 +487,10 @@ private void
cancelJobIfRequired(DeleteJobConfigArrivalEvent deleteJobArrival) t
}
}
+ public void setThrottleEnabled(boolean throttleEnabled) {
Review Comment:
use lombok `@Setter` instead of declaring one. Also, why do we need this?
Isn't this value settable via the config even for testing?
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends
GobblinHelixJobLauncherListener {
+
+ public final static Logger LOG =
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+ private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+ private Duration helixJobSchedulingThrottleTimeout;
+ private Clock clock;
+
+ public
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics
jobLauncherMetrics,
+ ConcurrentHashMap jobNameToNextSchedulableTime, Duration
helixJobSchedulingThrottleTimeout, Clock clock) {
+ super(jobLauncherMetrics);
+ this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+ this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+ this.clock = clock;
+ }
+
+ @Override
+ public void onJobPrepare(JobContext jobContext)
+ throws Exception {
+ super.onJobPrepare(jobContext);
+ Instant nextSchedulableTime =
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+ jobNameToNextSchedulableTime.put(jobContext.getJobName(),
nextSchedulableTime);
+ LOG.info(jobContext.getJobName() + " finished prepare. The next
schedulable time is " + nextSchedulableTime );
Review Comment:
Nit: grammar "finished preparing"
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends
GobblinHelixJobLauncherListener {
+
+ public final static Logger LOG =
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+ private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+ private Duration helixJobSchedulingThrottleTimeout;
+ private Clock clock;
+
+ public
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics
jobLauncherMetrics,
+ ConcurrentHashMap jobNameToNextSchedulableTime, Duration
helixJobSchedulingThrottleTimeout, Clock clock) {
+ super(jobLauncherMetrics);
+ this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+ this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+ this.clock = clock;
+ }
+
+ @Override
+ public void onJobPrepare(JobContext jobContext)
+ throws Exception {
+ super.onJobPrepare(jobContext);
+ Instant nextSchedulableTime =
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+ jobNameToNextSchedulableTime.put(jobContext.getJobName(),
nextSchedulableTime);
+ LOG.info(jobContext.getJobName() + " finished prepare. The next
schedulable time is " + nextSchedulableTime );
Review Comment:
White space after the `nextSchedulableTime`
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinThrottlingHelixJobLauncherListener.java:
##########
@@ -0,0 +1,64 @@
+package org.apache.gobblin.cluster;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * A job listener used when {@link GobblinHelixJobLauncher} launches a job.
+ * In {@link GobblinHelixJobScheduler}, when throttling is enabled, this
+ * listener would record jobName to next schedulable time to decide whether
+ * the replanning should be executed or skipped.
+ */
+public class GobblinThrottlingHelixJobLauncherListener extends
GobblinHelixJobLauncherListener {
+
+ public final static Logger LOG =
LoggerFactory.getLogger(GobblinThrottlingHelixJobLauncherListener.class);
+ private ConcurrentHashMap<String, Instant> jobNameToNextSchedulableTime;
+ private Duration helixJobSchedulingThrottleTimeout;
+ private Clock clock;
+
+ public
GobblinThrottlingHelixJobLauncherListener(GobblinHelixJobLauncherMetrics
jobLauncherMetrics,
+ ConcurrentHashMap jobNameToNextSchedulableTime, Duration
helixJobSchedulingThrottleTimeout, Clock clock) {
+ super(jobLauncherMetrics);
+ this.jobNameToNextSchedulableTime = jobNameToNextSchedulableTime;
+ this.helixJobSchedulingThrottleTimeout = helixJobSchedulingThrottleTimeout;
+ this.clock = clock;
+ }
+
+ @Override
+ public void onJobPrepare(JobContext jobContext)
+ throws Exception {
+ super.onJobPrepare(jobContext);
+ Instant nextSchedulableTime =
clock.instant().plus(helixJobSchedulingThrottleTimeout);
+ jobNameToNextSchedulableTime.put(jobContext.getJobName(),
nextSchedulableTime);
+ LOG.info(jobContext.getJobName() + " finished prepare. The next
schedulable time is " + nextSchedulableTime );
+ }
+
+ @Override
+ public void onJobCompletion(JobContext jobContext)
+ throws Exception {
+ super.onJobCompletion(jobContext);
+ if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
+ jobNameToNextSchedulableTime.put(jobContext.getJobName(),
Instant.ofEpochMilli(0));
Review Comment:
Add a log if the job failed. I see there is an existing log for the entire
job context, but having a log specifically from the throttling scheduler would
be important here for those not familiar with the code when they are debugging
https://github.com/apache/gobblin/blob/702cadf48f910c79b129032aa673f08ce4397c03/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/AbstractJobListener.java#L59-L63
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -52,16 +62,23 @@
import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
import org.apache.gobblin.scheduler.SchedulerService;
+import static org.mockito.Mockito.when;
+
/**
* Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
*
*/
-@Test(groups = {"gobblin.cluster"})
+
+/**
+ * In all test cases, we use GobblinHelixManagerFactory instead of
+ * HelixManagerFactory, and use HelixManager as a local variable to avoid
+ * the HelixManager (ZkClient) is not connected error when that's set as
Review Comment:
Potentially better wording
> and instantiate a local HelixManager per test to provide isolation and
prevent errors caused by the ZKClient being shared (e.g. ZKClient is not
connected exceptions).
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -70,10 +87,18 @@ public class GobblinHelixJobSchedulerTest {
private GobblinTaskRunner gobblinTaskRunner;
private Thread thread;
-
private final String workflowIdSuffix1 = "_1504201348471";
private final String workflowIdSuffix2 = "_1504201348472";
+ private Instant beginTime = Instant.ofEpochMilli(0);
Review Comment:
`Instant.EPOCH`
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -52,16 +62,23 @@
import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
import org.apache.gobblin.scheduler.SchedulerService;
+import static org.mockito.Mockito.when;
+
/**
* Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
*
*/
-@Test(groups = {"gobblin.cluster"})
+
Review Comment:
Merge this with the java doc above
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +148,176 @@ public void setUp()
this.thread.start();
}
+ // Time span exceeds throttle timeout, within same workflow, throttle is
enabled
+ // Job will be updated
Review Comment:
Comments describing the method should be a java doc instead of a regular
comment
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +148,176 @@ public void setUp()
this.thread.start();
}
+ // Time span exceeds throttle timeout, within same workflow, throttle is
enabled
+ // Job will be updated
+ @Test
+ public void testUpdateSameWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, true);
+ }
+
+ // Time span is within throttle timeout, within same workflow, throttle is
enabled
+ // Job will not be updated
@Test
- public void testNewJobAndUpdate()
+ public void testUpdateSameWorkflowShortPeriodThrottle()
throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+ true, true);
+ }
+
+ // Time span exceeds throttle timeout, within same workflow, throttle is not
enabled
+ // Job will be updated
+ @Test
+ public void testUpdateSameWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ // Time span is within throttle timeout, within same workflow, throttle is
not enabled
+ // Job will be updated
+ @Test
+ public void testUpdateSameWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ // Time span is within throttle timeout, within different workflow, throttle
is enabled
+ // Job will be updated
+ public void testUpdateDiffWorkflowShortPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ // Time span is within throttle timeout, within different workflow, throttle
is not enabled
+ // Job will be updated
+ @Test
+ public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ // Time span exceeds throttle timeout, within different workflow, throttle
is enabled
+ // Job will be updated
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ // Time span exceeds throttle timeout, within different workflow, throttle
is not enabled
+ // Job will be updated
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ private GobblinHelixJobScheduler createJobScheduler(HelixManager
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+ java.nio.file.Path p =
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- ConfigValueFactory.fromAnyRef("/tmp/" +
GobblinHelixJobScheduler.class.getSimpleName()));
+ ConfigValueFactory.fromAnyRef(p.toString()));
SchedulerService schedulerService = new SchedulerService(new Properties());
NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
jobCatalog.startAsync();
- GobblinHelixJobScheduler jobScheduler =
- new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager,
java.util.Optional.empty(),
- new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
-
- final Properties properties1 =
- GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig,
"1", workflowIdSuffix1);
-
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
+ GobblinHelixJobScheduler gobblinHelixJobScheduler;
+ if (isThrottleEnabled) {
+ gobblinHelixJobScheduler = new
GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager,
java.util.Optional.empty(),
+ new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog, clock);
+ }
+ else {
+ gobblinHelixJobScheduler = new
GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager,
java.util.Optional.empty(),
+ new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
+ }
+ gobblinHelixJobScheduler.setThrottleEnabled(isThrottleEnabled);
Review Comment:
nit: wouldn't we want to set this via config?
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +148,176 @@ public void setUp()
this.thread.start();
}
+ // Time span exceeds throttle timeout, within same workflow, throttle is
enabled
+ // Job will be updated
+ @Test
+ public void testUpdateSameWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, true);
+ }
+
+ // Time span is within throttle timeout, within same workflow, throttle is
enabled
+ // Job will not be updated
@Test
- public void testNewJobAndUpdate()
+ public void testUpdateSameWorkflowShortPeriodThrottle()
throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+ true, true);
+ }
+
+ // Time span exceeds throttle timeout, within same workflow, throttle is not
enabled
+ // Job will be updated
+ @Test
+ public void testUpdateSameWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ // Time span is within throttle timeout, within same workflow, throttle is
not enabled
+ // Job will be updated
+ @Test
+ public void testUpdateSameWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ // Time span is within throttle timeout, within different workflow, throttle
is enabled
+ // Job will be updated
+ public void testUpdateDiffWorkflowShortPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ // Time span is within throttle timeout, within different workflow, throttle
is not enabled
+ // Job will be updated
+ @Test
+ public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ // Time span exceeds throttle timeout, within different workflow, throttle
is enabled
+ // Job will be updated
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ // Time span exceeds throttle timeout, within different workflow, throttle
is not enabled
+ // Job will be updated
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ private GobblinHelixJobScheduler createJobScheduler(HelixManager
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+ java.nio.file.Path p =
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- ConfigValueFactory.fromAnyRef("/tmp/" +
GobblinHelixJobScheduler.class.getSimpleName()));
+ ConfigValueFactory.fromAnyRef(p.toString()));
SchedulerService schedulerService = new SchedulerService(new Properties());
NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
jobCatalog.startAsync();
- GobblinHelixJobScheduler jobScheduler =
- new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager,
java.util.Optional.empty(),
- new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
-
- final Properties properties1 =
- GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig,
"1", workflowIdSuffix1);
-
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
+ GobblinHelixJobScheduler gobblinHelixJobScheduler;
+ if (isThrottleEnabled) {
+ gobblinHelixJobScheduler = new
GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager,
java.util.Optional.empty(),
Review Comment:
We can inject the clock regardless of if throttling is enabled. We'd never
want to use UTC clock in a unit test IMO
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -129,58 +148,176 @@ public void setUp()
this.thread.start();
}
+ // Time span exceeds throttle timeout, within same workflow, throttle is
enabled
+ // Job will be updated
+ @Test
+ public void testUpdateSameWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, true);
+ }
+
+ // Time span is within throttle timeout, within same workflow, throttle is
enabled
+ // Job will not be updated
@Test
- public void testNewJobAndUpdate()
+ public void testUpdateSameWorkflowShortPeriodThrottle()
throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix1,
+ true, true);
+ }
+
+ // Time span exceeds throttle timeout, within same workflow, throttle is not
enabled
+ // Job will be updated
+ @Test
+ public void testUpdateSameWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateSameWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ // Time span is within throttle timeout, within same workflow, throttle is
not enabled
+ // Job will be updated
+ @Test
+ public void testUpdateSameWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateSameWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, true);
+ }
+
+ // Time span is within throttle timeout, within different workflow, throttle
is enabled
+ // Job will be updated
+ public void testUpdateDiffWorkflowShortPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ // Time span is within throttle timeout, within different workflow, throttle
is not enabled
+ // Job will be updated
+ @Test
+ public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(withinThrottlePeriod,
"UpdateDiffWorkflowShortPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ // Time span exceeds throttle timeout, within different workflow, throttle
is enabled
+ // Job will be updated
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ true, false);
+ }
+
+ // Time span exceeds throttle timeout, within different workflow, throttle
is not enabled
+ // Job will be updated
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ runWorkflowTest(exceedsThrottlePeriod,
"UpdateDiffWorkflowLongPeriodNoThrottle",
+ workflowIdSuffix1, workflowIdSuffix2, workflowIdSuffix2,
+ false, false);
+ }
+
+ private GobblinHelixJobScheduler createJobScheduler(HelixManager
helixManager, boolean isThrottleEnabled, Clock clock) throws Exception {
+ java.nio.file.Path p =
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- ConfigValueFactory.fromAnyRef("/tmp/" +
GobblinHelixJobScheduler.class.getSimpleName()));
+ ConfigValueFactory.fromAnyRef(p.toString()));
SchedulerService schedulerService = new SchedulerService(new Properties());
NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
jobCatalog.startAsync();
- GobblinHelixJobScheduler jobScheduler =
- new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager,
java.util.Optional.empty(),
- new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
-
- final Properties properties1 =
- GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig,
"1", workflowIdSuffix1);
-
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
+ GobblinHelixJobScheduler gobblinHelixJobScheduler;
+ if (isThrottleEnabled) {
+ gobblinHelixJobScheduler = new
GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager,
java.util.Optional.empty(),
+ new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog, clock);
+ }
+ else {
+ gobblinHelixJobScheduler = new
GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager,
java.util.Optional.empty(),
+ new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
+ }
+ gobblinHelixJobScheduler.setThrottleEnabled(isThrottleEnabled);
+ return gobblinHelixJobScheduler;
+ }
+ private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties
properties) {
+
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
NewJobConfigArrivalEvent newJobConfigArrivalEvent =
- new
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1);
- jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
- properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
- "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
workflowIdSuffix2);
- Map<String, String> workflowIdMap;
- this.helixManager.connect();
+ new
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties);
+ return newJobConfigArrivalEvent;
+ }
+
+ private void connectAndAssertWorkflowId(String expectedSuffix,
NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager )
throws Exception {
+ helixManager.connect();
+ String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
+ Assert.assertNotNull(workFlowId);
+ Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+ }
- String workFlowId = null;
+ private String getWorkflowID (NewJobConfigArrivalEvent
newJobConfigArrivalEvent, HelixManager helixManager)
+ throws Exception {
+ // endTime is manually set time period that we allow HelixUtils to fetch
workflowIdMap before timeout
Review Comment:
Maybe better wording:
> Poll helix for up to 30 seconds to fetch until a workflow with a matching
job name exists in Helix and then return that workflowID
##########
gobblin-cluster/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker:
##########
@@ -0,0 +1 @@
+mock-maker-inline
Review Comment:
I don't think we need this anymore since we are not mocking any static
classes
Issue Time Tracking
-------------------
Worklog Id: (was: 867597)
Time Spent: 2h 40m (was: 2.5h)
> Helix Job scheduler should not try to replace running workflow if within
> configured time
> ----------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1840
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1840
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Matthew Ho
> Priority: Major
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)