homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1230000099
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -96,16 +107,16 @@ public void setUp()
ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath()))
.withValue(ConfigurationKeys.JOB_STATE_IN_STATE_STORE,
ConfigValueFactory.fromAnyRef("true")).resolve();
- String zkConnectingString =
baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
- String helixClusterName =
baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+ zkConnectingString =
baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
Review Comment:
nit: within this test, it seems we use `this.` as a prefix for member
variables. So let's try to follow convention
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -96,16 +107,16 @@ public void setUp()
ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath()))
.withValue(ConfigurationKeys.JOB_STATE_IN_STATE_STORE,
ConfigValueFactory.fromAnyRef("true")).resolve();
- String zkConnectingString =
baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
- String helixClusterName =
baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+ zkConnectingString =
baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+ helixClusterName =
baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
HelixUtils.createGobblinHelixCluster(zkConnectingString, helixClusterName);
- this.helixManager = HelixManagerFactory
+ HelixManager helixManager = HelixManagerFactory
Review Comment:
Why do we create a local variable helixManager here? Seems like it's not
used aside from connecting
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
@Subscribe
public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent
updateJobArrival) {
LOGGER.info("Received update for job configuration of job " +
updateJobArrival.getJobName());
+ String jobName = updateJobArrival.getJobName();
+ boolean throttleEnabled =
PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
+
GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));
+
+ if (throttleEnabled && this.jobStartTimeMap.containsKey(jobName)) {
+ Instant jobStartTime = this.jobStartTimeMap.get(jobName);
+ Duration workflowDuration = Duration.between(jobStartTime,
Instant.now());
+ Duration difference = workflowDuration.minus(throttleTimeoutDuration);
Review Comment:
Nit: This variable doesn't add much value here. IMO something like
```
workflowDuration.minus(throttleTimeoutDuration).isNegative()
```
Seems reasonably readable.
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -70,9 +74,16 @@ public class GobblinHelixJobSchedulerTest {
private GobblinTaskRunner gobblinTaskRunner;
private Thread thread;
-
private final String workflowIdSuffix1 = "_1504201348471";
private final String workflowIdSuffix2 = "_1504201348472";
+ private final String workflowIdSuffix3 = "_1504201348473";
+
+ private Instant beginTime = Instant.ofEpochMilli(0);
+ private Instant shortPeriod = Instant.ofEpochMilli(1);
Review Comment:
Nit: Period doesn't seem quite right here since these are `Instant` of time.
So maybe something like
`shortTimeLater` and `longTimeLater` could make more sense
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
@Subscribe
public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent
updateJobArrival) {
LOGGER.info("Received update for job configuration of job " +
updateJobArrival.getJobName());
+ String jobName = updateJobArrival.getJobName();
+ boolean throttleEnabled =
PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
+
GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));
+
+ if (throttleEnabled && this.jobStartTimeMap.containsKey(jobName)) {
+ Instant jobStartTime = this.jobStartTimeMap.get(jobName);
+ Duration workflowDuration = Duration.between(jobStartTime,
Instant.now());
Review Comment:
Maybe `workflowRunningDuration` is a more descriptive name. @ZihanLi58 can
you help chime in here
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java:
##########
@@ -222,4 +222,11 @@ public class GobblinClusterConfigurationKeys {
public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX +
"containerId";
public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX =
GOBBLIN_CLUSTER_PREFIX + "sysProps";
+
+ public static final String HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY =
"helix.job.scheduling.throttle.enabled";
+ public static final boolean
DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = false;
Review Comment:
The reason we would call this `Throttle` is because we are preventing
frequent replanner restarts from executing in the Gobblin AM. In other words,
when a Helix workflow is computed by a regular start-up, replanner, or nurse
replanner, there is no reason to ever recompute and resubmit.
This is because there is a significant "cold" start time to these workflows
and there would be no benefit from allow these things to trigger shortly one
after another.
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -110,6 +113,8 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
private boolean startServicesCompleted;
private final long helixJobStopTimeoutMillis;
+ private final Duration throttleTimeoutDuration;
+ private ConcurrentHashMap<String, Instant> jobStartTimeMap;
Review Comment:
Also just my opinion, but Instant (or I guess `Timestamp`) are less error
prone to write code for than millis.
I don't have an opinion on changing the above duration to Millis long to fit
the rest of the class. But Instant vs long is a big deal because long is hard
to reason about. It often refers to epoch millis but you always have to add
that epochmillis to the name of the map.
As for `java.time.Instant` vs `java.sql.Timestamp`, I've seen `Instant` used
elsewhere. And personally haven't seen `Timestamp` used much. So clearly there
are multiple pockets of usage. and either one makes sense probably.
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -110,6 +113,8 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
private boolean startServicesCompleted;
private final long helixJobStopTimeoutMillis;
+ private final Duration throttleTimeoutDuration;
+ private ConcurrentHashMap<String, Instant> jobStartTimeMap;
Review Comment:
+1 to adjusting the map name. Pretty sure the key is the jobName is the key,
which refers to the Gobblin configuration job.name
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
@Subscribe
public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent
updateJobArrival) {
LOGGER.info("Received update for job configuration of job " +
updateJobArrival.getJobName());
+ String jobName = updateJobArrival.getJobName();
+ boolean throttleEnabled =
PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
+
GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));
+
+ if (throttleEnabled && this.jobStartTimeMap.containsKey(jobName)) {
+ Instant jobStartTime = this.jobStartTimeMap.get(jobName);
+ Duration workflowDuration = Duration.between(jobStartTime,
Instant.now());
+ Duration difference = workflowDuration.minus(throttleTimeoutDuration);
+ if (difference.isNegative()) {
Review Comment:
+1 to a log for if this executes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]