homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1230000099


##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -96,16 +107,16 @@ public void setUp()
             ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath()))
         .withValue(ConfigurationKeys.JOB_STATE_IN_STATE_STORE, 
ConfigValueFactory.fromAnyRef("true")).resolve();
 
-    String zkConnectingString = 
baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
-    String helixClusterName = 
baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    zkConnectingString = 
baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);

Review Comment:
   nit: within this test, it seems we use `this.` as a prefix for member 
variables. So let's try to follow convention



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -96,16 +107,16 @@ public void setUp()
             ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath()))
         .withValue(ConfigurationKeys.JOB_STATE_IN_STATE_STORE, 
ConfigValueFactory.fromAnyRef("true")).resolve();
 
-    String zkConnectingString = 
baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
-    String helixClusterName = 
baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    zkConnectingString = 
baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    helixClusterName = 
baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
 
     HelixUtils.createGobblinHelixCluster(zkConnectingString, helixClusterName);
 
-    this.helixManager = HelixManagerFactory
+    HelixManager helixManager = HelixManagerFactory

Review Comment:
   Why do we create a local variable helixManager here? Seems like it's not 
used aside from connecting



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent 
updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + 
updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean throttleEnabled = 
PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
+        
GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));
+
+    if (throttleEnabled && this.jobStartTimeMap.containsKey(jobName)) {
+      Instant jobStartTime = this.jobStartTimeMap.get(jobName);
+      Duration workflowDuration = Duration.between(jobStartTime, 
Instant.now());
+      Duration difference = workflowDuration.minus(throttleTimeoutDuration);

Review Comment:
   Nit: This variable doesn't add much value here.  IMO something like
   ```
   workflowDuration.minus(throttleTimeoutDuration).isNegative()
   ```
   
   Seems reasonably readable. 



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -70,9 +74,16 @@ public class GobblinHelixJobSchedulerTest {
   private GobblinTaskRunner gobblinTaskRunner;
 
   private Thread thread;
-
   private final String workflowIdSuffix1 = "_1504201348471";
   private final String workflowIdSuffix2 = "_1504201348472";
+  private final String workflowIdSuffix3 = "_1504201348473";
+
+  private Instant beginTime = Instant.ofEpochMilli(0);
+  private Instant shortPeriod = Instant.ofEpochMilli(1);

Review Comment:
   Nit: Period doesn't seem quite right here since these are `Instant` of time. 
So maybe something like
   `shortTimeLater` and `longTimeLater` could make more sense



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent 
updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + 
updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean throttleEnabled = 
PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
+        
GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));
+
+    if (throttleEnabled && this.jobStartTimeMap.containsKey(jobName)) {
+      Instant jobStartTime = this.jobStartTimeMap.get(jobName);
+      Duration workflowDuration = Duration.between(jobStartTime, 
Instant.now());

Review Comment:
   Maybe `workflowRunningDuration` is a more descriptive name. @ZihanLi58 can 
you help chime in here



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java:
##########
@@ -222,4 +222,11 @@ public class GobblinClusterConfigurationKeys {
   public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX + 
"containerId";
 
   public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX = 
GOBBLIN_CLUSTER_PREFIX + "sysProps";
+
+  public static final String HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = 
"helix.job.scheduling.throttle.enabled";
+  public static final boolean 
DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = false;

Review Comment:
   The reason we would call this `Throttle` is because we are preventing 
frequent replanner restarts from executing in the Gobblin AM. In other words, 
when a Helix workflow is computed by a regular start-up, replanner, or nurse 
replanner, there is no reason to ever recompute and resubmit.
   
   This is because there is a significant "cold" start time to these workflows 
and there would be no benefit from allow these things to trigger shortly one 
after another.



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -110,6 +113,8 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
 
   private boolean startServicesCompleted;
   private final long helixJobStopTimeoutMillis;
+  private final Duration throttleTimeoutDuration;
+  private ConcurrentHashMap<String, Instant> jobStartTimeMap;

Review Comment:
   Also just my opinion, but Instant (or I guess `Timestamp`) are less error 
prone to write code for than millis. 
   
   I don't have an opinion on changing the above duration to Millis long to fit 
the rest of the class. But Instant vs long is a big deal because long is hard 
to reason about. It often refers to epoch millis but you always have to add 
that epochmillis to the name of the map. 
   
   As for `java.time.Instant` vs `java.sql.Timestamp`, I've seen `Instant` used 
elsewhere. And personally haven't seen `Timestamp` used much. So clearly there 
are multiple pockets  of usage. and either one makes sense probably.



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -110,6 +113,8 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
 
   private boolean startServicesCompleted;
   private final long helixJobStopTimeoutMillis;
+  private final Duration throttleTimeoutDuration;
+  private ConcurrentHashMap<String, Instant> jobStartTimeMap;

Review Comment:
   +1 to adjusting the map name. Pretty sure the key is the jobName is the key, 
which refers to the Gobblin configuration job.name
   
   



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
   @Subscribe
   public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent 
updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + 
updateJobArrival.getJobName());
+    String jobName = updateJobArrival.getJobName();
+    boolean throttleEnabled = 
PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
+        
GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+        
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));
+
+    if (throttleEnabled && this.jobStartTimeMap.containsKey(jobName)) {
+      Instant jobStartTime = this.jobStartTimeMap.get(jobName);
+      Duration workflowDuration = Duration.between(jobStartTime, 
Instant.now());
+      Duration difference = workflowDuration.minus(throttleTimeoutDuration);
+      if (difference.isNegative()) {

Review Comment:
   +1 to a log for if this executes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to