[ 
https://issues.apache.org/jira/browse/GOBBLIN-1840?focusedWorklogId=867878&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-867878
 ]

ASF GitHub Bot logged work on GOBBLIN-1840:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Jun/23 22:00
            Start Date: 27/Jun/23 22:00
    Worklog Time Spent: 10m 
      Work Description: homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244406476


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -357,19 +370,22 @@ public synchronized void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJ
       }
     } catch (JobException je) {
       LOGGER.error("Failed to schedule or run job " + jobUri, je);

Review Comment:
   Update this log to say that you are resetting the clock



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -264,59 +264,61 @@ private NewJobConfigArrivalEvent 
createJobConfigArrivalEvent(Properties properti
     return newJobConfigArrivalEvent;
   }
 
-  private void connectAndAssertWorkflowId(String expectedSuffix, 
NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) 
throws Exception {
+  private void connectAndAssertWorkflowId(String expectedSuffix, String 
jobName, HelixManager helixManager) throws Exception {
     helixManager.connect();
-    String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
+    String workFlowId = getWorkflowID(jobName, helixManager);
     Assert.assertNotNull(workFlowId);
     Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
   }
 
-  private String getWorkflowID (NewJobConfigArrivalEvent 
newJobConfigArrivalEvent, HelixManager helixManager)
+  private String getWorkflowID (String jobName, HelixManager helixManager)
       throws Exception {
     // Poll helix for up to 30 seconds to fetch until a workflow with a 
matching job name exists in Helix and then return that workflowID
     long endTime = System.currentTimeMillis() + 30000;
     Map<String, String> workflowIdMap;
     while (System.currentTimeMillis() < endTime) {
       try{
         workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
-            Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
+            Collections.singletonList(jobName));
       } catch(GobblinHelixUnexpectedStateException e){
         continue;
       }
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        return workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
+      if (workflowIdMap.containsKey(jobName)) {
+        return workflowIdMap.get(jobName);
       }
       Thread.sleep(100);
     }
     return null;
   }
 
-  private void runWorkflowTest(Instant mockedTime, String jobSuffix,
+  private void runWorkflowTest(Duration mockedTime, String jobSuffix,

Review Comment:
   Does `mockedTime` still make sense? Seems like it now represents a step 
duration for incrementing the clock forward in time



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -357,19 +370,22 @@ public synchronized void 
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJ
       }
     } catch (JobException je) {
       LOGGER.error("Failed to schedule or run job " + jobUri, je);
+      jobNameToNextSchedulableTime.put(jobUri, Instant.EPOCH);
     }
   }
 
   @Subscribe
   public synchronized void 
handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
     LOGGER.info("Received update for job configuration of job " + 
updateJobArrival.getJobName());
-    String jobName = updateJobArrival.getJobName();
+    String jobUri = updateJobArrival.getJobName();

Review Comment:
   Let's not mix up the usage of job uri and job name. If you are gonna use job 
name (e.g. jobNameToNextSchedulableTime), then use the term job name 
everywhere. And if you are gonna use job uri, then change it for all of them 



##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -264,59 +264,61 @@ private NewJobConfigArrivalEvent 
createJobConfigArrivalEvent(Properties properti
     return newJobConfigArrivalEvent;
   }
 
-  private void connectAndAssertWorkflowId(String expectedSuffix, 
NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager ) 
throws Exception {
+  private void connectAndAssertWorkflowId(String expectedSuffix, String 
jobName, HelixManager helixManager) throws Exception {
     helixManager.connect();
-    String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
+    String workFlowId = getWorkflowID(jobName, helixManager);
     Assert.assertNotNull(workFlowId);
     Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
   }
 
-  private String getWorkflowID (NewJobConfigArrivalEvent 
newJobConfigArrivalEvent, HelixManager helixManager)
+  private String getWorkflowID (String jobName, HelixManager helixManager)
       throws Exception {
     // Poll helix for up to 30 seconds to fetch until a workflow with a 
matching job name exists in Helix and then return that workflowID
     long endTime = System.currentTimeMillis() + 30000;
     Map<String, String> workflowIdMap;
     while (System.currentTimeMillis() < endTime) {
       try{
         workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
-            Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
+            Collections.singletonList(jobName));
       } catch(GobblinHelixUnexpectedStateException e){
         continue;
       }
-      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
-        return workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
+      if (workflowIdMap.containsKey(jobName)) {
+        return workflowIdMap.get(jobName);
       }
       Thread.sleep(100);
     }
     return null;
   }
 
-  private void runWorkflowTest(Instant mockedTime, String jobSuffix,
+  private void runWorkflowTest(Duration mockedTime, String jobSuffix,
     String newJobWorkflowIdSuffix, String updateWorkflowIdSuffix,
     String assertUpdateWorkflowIdSuffix, boolean isThrottleEnabled, boolean 
isSameWorkflow) throws Exception {
     Clock mockClock = Mockito.mock(Clock.class);
-    AtomicInteger count = new AtomicInteger(0);
-    when(mockClock.instant()).thenAnswer(invocation -> count.getAndIncrement() 
== 0 ? beginTime : mockedTime);
+    AtomicReference<Instant> nextInstant = new AtomicReference<>(beginTime);
+    when(mockClock.instant()).thenAnswer(invocation -> {
+      Instant currentInstant = nextInstant.get();
+      nextInstant.set(currentInstant.plus(mockedTime));

Review Comment:
   I noticed that you're using AtomicReference. But you are not doing an atomic 
get and set, which basically defeats the point of what you're doing. 
   
   Did you mean to do something like `getAndAccumulate`?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 867878)
    Time Spent: 3.5h  (was: 3h 20m)

> Helix Job scheduler should not try to replace running workflow if within 
> configured time
> ----------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1840
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1840
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Matthew Ho
>            Priority: Major
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to