[
https://issues.apache.org/jira/browse/GOBBLIN-1840?focusedWorklogId=867878&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-867878
]
ASF GitHub Bot logged work on GOBBLIN-1840:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 27/Jun/23 22:00
Start Date: 27/Jun/23 22:00
Worklog Time Spent: 10m
Work Description: homatthew commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1244406476
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -357,19 +370,22 @@ public synchronized void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJ
}
} catch (JobException je) {
LOGGER.error("Failed to schedule or run job " + jobUri, je);
Review Comment:
Update this log to say that you are resetting the clock
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -264,59 +264,61 @@ private NewJobConfigArrivalEvent
createJobConfigArrivalEvent(Properties properti
return newJobConfigArrivalEvent;
}
- private void connectAndAssertWorkflowId(String expectedSuffix,
NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager )
throws Exception {
+ private void connectAndAssertWorkflowId(String expectedSuffix, String
jobName, HelixManager helixManager) throws Exception {
helixManager.connect();
- String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
+ String workFlowId = getWorkflowID(jobName, helixManager);
Assert.assertNotNull(workFlowId);
Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
}
- private String getWorkflowID (NewJobConfigArrivalEvent
newJobConfigArrivalEvent, HelixManager helixManager)
+ private String getWorkflowID (String jobName, HelixManager helixManager)
throws Exception {
// Poll helix for up to 30 seconds to fetch until a workflow with a
matching job name exists in Helix and then return that workflowID
long endTime = System.currentTimeMillis() + 30000;
Map<String, String> workflowIdMap;
while (System.currentTimeMillis() < endTime) {
try{
workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
- Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
+ Collections.singletonList(jobName));
} catch(GobblinHelixUnexpectedStateException e){
continue;
}
- if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
- return workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
+ if (workflowIdMap.containsKey(jobName)) {
+ return workflowIdMap.get(jobName);
}
Thread.sleep(100);
}
return null;
}
- private void runWorkflowTest(Instant mockedTime, String jobSuffix,
+ private void runWorkflowTest(Duration mockedTime, String jobSuffix,
Review Comment:
Does `mockedTime` still make sense? Seems like it now represents a step
duration for incrementing the clock forward in time
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -357,19 +370,22 @@ public synchronized void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJ
}
} catch (JobException je) {
LOGGER.error("Failed to schedule or run job " + jobUri, je);
+ jobNameToNextSchedulableTime.put(jobUri, Instant.EPOCH);
}
}
@Subscribe
public synchronized void
handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
LOGGER.info("Received update for job configuration of job " +
updateJobArrival.getJobName());
- String jobName = updateJobArrival.getJobName();
+ String jobUri = updateJobArrival.getJobName();
Review Comment:
Let's not mix up the usage of job uri and job name. If you are gonna use job
name (e.g. jobNameToNextSchedulableTime), then use the term job name
everywhere. And if you are gonna use job uri, then change it for all of them
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -264,59 +264,61 @@ private NewJobConfigArrivalEvent
createJobConfigArrivalEvent(Properties properti
return newJobConfigArrivalEvent;
}
- private void connectAndAssertWorkflowId(String expectedSuffix,
NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager )
throws Exception {
+ private void connectAndAssertWorkflowId(String expectedSuffix, String
jobName, HelixManager helixManager) throws Exception {
helixManager.connect();
- String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
+ String workFlowId = getWorkflowID(jobName, helixManager);
Assert.assertNotNull(workFlowId);
Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
}
- private String getWorkflowID (NewJobConfigArrivalEvent
newJobConfigArrivalEvent, HelixManager helixManager)
+ private String getWorkflowID (String jobName, HelixManager helixManager)
throws Exception {
// Poll helix for up to 30 seconds to fetch until a workflow with a
matching job name exists in Helix and then return that workflowID
long endTime = System.currentTimeMillis() + 30000;
Map<String, String> workflowIdMap;
while (System.currentTimeMillis() < endTime) {
try{
workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(helixManager,
- Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
+ Collections.singletonList(jobName));
} catch(GobblinHelixUnexpectedStateException e){
continue;
}
- if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
- return workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
+ if (workflowIdMap.containsKey(jobName)) {
+ return workflowIdMap.get(jobName);
}
Thread.sleep(100);
}
return null;
}
- private void runWorkflowTest(Instant mockedTime, String jobSuffix,
+ private void runWorkflowTest(Duration mockedTime, String jobSuffix,
String newJobWorkflowIdSuffix, String updateWorkflowIdSuffix,
String assertUpdateWorkflowIdSuffix, boolean isThrottleEnabled, boolean
isSameWorkflow) throws Exception {
Clock mockClock = Mockito.mock(Clock.class);
- AtomicInteger count = new AtomicInteger(0);
- when(mockClock.instant()).thenAnswer(invocation -> count.getAndIncrement()
== 0 ? beginTime : mockedTime);
+ AtomicReference<Instant> nextInstant = new AtomicReference<>(beginTime);
+ when(mockClock.instant()).thenAnswer(invocation -> {
+ Instant currentInstant = nextInstant.get();
+ nextInstant.set(currentInstant.plus(mockedTime));
Review Comment:
I noticed that you're using AtomicReference. But you are not doing an atomic
get and set, which basically defeats the point of what you're doing.
Did you mean to do something like `getAndAccumulate`?
Issue Time Tracking
-------------------
Worklog Id: (was: 867878)
Time Spent: 3.5h (was: 3h 20m)
> Helix Job scheduler should not try to replace running workflow if within
> configured time
> ----------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1840
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1840
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Matthew Ho
> Priority: Major
> Time Spent: 3.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)