[
https://issues.apache.org/jira/browse/GOBBLIN-1840?focusedWorklogId=865665&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-865665
]
ASF GitHub Bot logged work on GOBBLIN-1840:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 15/Jun/23 03:34
Start Date: 15/Jun/23 03:34
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3704:
URL: https://github.com/apache/gobblin/pull/3704#discussion_r1230375126
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -162,6 +167,11 @@ public GobblinHelixJobScheduler(Config sysConfig,
this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(sysConfig,
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS)
* 1000;
+ this.throttleTimeoutDuration = Duration.of(ConfigUtils.getLong(sysConfig,
GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_TIMEOUT_SECONDS_KEY,
Review Comment:
rename this to specify units as well `throttleTimeoutDurationSecs`
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
@Test
public void testNewJobAndUpdate()
throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+
+ connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowShortPeriodThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+ HelixManager helixManager = HelixManagerFactory
Review Comment:
can u reuse `helixManager` across tests?
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
@Subscribe
public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent
updateJobArrival) {
LOGGER.info("Received update for job configuration of job " +
updateJobArrival.getJobName());
+ String jobName = updateJobArrival.getJobName();
+ boolean throttleEnabled =
PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
Review Comment:
does this default to false if config is not provided? if not provide a
default value
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -70,9 +74,16 @@ public class GobblinHelixJobSchedulerTest {
private GobblinTaskRunner gobblinTaskRunner;
private Thread thread;
-
private final String workflowIdSuffix1 = "_1504201348471";
private final String workflowIdSuffix2 = "_1504201348472";
+ private final String workflowIdSuffix3 = "_1504201348473";
+
+ private Instant beginTime = Instant.ofEpochMilli(0);
+ private Instant shortPeriod = Instant.ofEpochMilli(1);
+ private Instant longPeriod = Instant.ofEpochMilli(3600001);
Review Comment:
perhaps rename more clearly to `withinThrottlePeriod` `exceedsThrottlePeriod`
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -110,6 +113,8 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
private boolean startServicesCompleted;
private final long helixJobStopTimeoutMillis;
+ private final Duration throttleTimeoutDuration;
+ private ConcurrentHashMap<String, Instant> jobStartTimeMap;
Review Comment:
`jobUriToStartTimeMap`? better if u can clarify what the string is. Also
lets be consistent between START/CREATE time you mention in description. Why do
we use Instant rather than Timestamp or Long (milliseconds)? The latter is
typically what I see used in our code.
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
@Subscribe
public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent
updateJobArrival) {
LOGGER.info("Received update for job configuration of job " +
updateJobArrival.getJobName());
+ String jobName = updateJobArrival.getJobName();
+ boolean throttleEnabled =
PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
Review Comment:
usually booleans are easily identified with `is....` like
`isThrottleEnabled`
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java:
##########
@@ -333,6 +345,20 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
@Subscribe
public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent
updateJobArrival) {
LOGGER.info("Received update for job configuration of job " +
updateJobArrival.getJobName());
+ String jobName = updateJobArrival.getJobName();
+ boolean throttleEnabled =
PropertiesUtils.getPropAsBoolean(updateJobArrival.getJobConfig(),
+
GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
+
String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY));
+
+ if (throttleEnabled && this.jobStartTimeMap.containsKey(jobName)) {
+ Instant jobStartTime = this.jobStartTimeMap.get(jobName);
+ Duration workflowDuration = Duration.between(jobStartTime,
Instant.now());
+ Duration difference = workflowDuration.minus(throttleTimeoutDuration);
+ if (difference.isNegative()) {
Review Comment:
add comment hear to say we skip recalculation is workflowDuration is <
throttleTimeoutDuration. It's also worth adding a log statement here so you can
track when replanning is skipped
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java:
##########
@@ -222,4 +222,11 @@ public class GobblinClusterConfigurationKeys {
public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX +
"containerId";
public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX =
GOBBLIN_CLUSTER_PREFIX + "sysProps";
+
+ public static final String HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY =
"helix.job.scheduling.throttle.enabled";
+ public static final boolean
DEFAULT_HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY = false;
Review Comment:
Not sure the use of word `throttle` captures what this key is actually used
for. Phrases that make more sense to me
- Short circuit replanning
- Early exit
- Shortcut evaluation
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
@Test
public void testNewJobAndUpdate()
throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+
+ connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowShortPeriodThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"false");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+ connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowShortPeriodNoThrottle()
Review Comment:
tests should be easy to follow ideally with short comment above test to
describe the case, the name is quite helpful but an additional comment above or
within the code itself will help. I know that short period + no throttle means
-> do not skip but you want to comment this behavior somewhere.
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
@Test
public void testNewJobAndUpdate()
throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+
+ connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowShortPeriodThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
Review Comment:
if there's no `properties2`, then just name this `properties`
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
@Test
public void testNewJobAndUpdate()
throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+
+ connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowShortPeriodThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"false");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+ connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateSameWorkflowShortPeriodNoThrottle",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"false");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+ connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ public void testUpdateDiffWorkflowShortPeriodThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle1",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+ final Properties properties2 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle2",
workflowIdSuffix3);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+ new
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2);
+
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2));
+ connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle1",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+ final Properties properties2 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle2",
workflowIdSuffix3);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+ new
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2);
+
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"false");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2));
+ connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle1",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+ final Properties properties2 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle2",
workflowIdSuffix3);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+ new
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2);
+
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2));
+ connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowLongPeriodNoThrottle1",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+ final Properties properties2 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowLongPeriodNoThrottle2",
workflowIdSuffix3);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+ new
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2);
+
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"false");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2));
+ connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2,
helixManager);
+ }
+ }
+
+ private GobblinHelixJobScheduler createJobScheduler(HelixManager
helixManager) throws Exception {
+ java.nio.file.Path p =
Files.createTempDirectory(GobblinHelixJobScheduler.class.getSimpleName());
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- ConfigValueFactory.fromAnyRef("/tmp/" +
GobblinHelixJobScheduler.class.getSimpleName()));
+ ConfigValueFactory.fromAnyRef(p.toString()));
SchedulerService schedulerService = new SchedulerService(new Properties());
NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
jobCatalog.startAsync();
- GobblinHelixJobScheduler jobScheduler =
- new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager,
java.util.Optional.empty(),
- new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
-
- final Properties properties1 =
- GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig,
"1", workflowIdSuffix1);
-
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
+ return new GobblinHelixJobScheduler(ConfigFactory.empty(), helixManager,
java.util.Optional.empty(),
+ new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
+ }
+ private NewJobConfigArrivalEvent createJobConfigArrivalEvent(Properties
properties, String suffix) {
+
properties.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
NewJobConfigArrivalEvent newJobConfigArrivalEvent =
- new
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1);
- jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
- properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
- "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
workflowIdSuffix2);
- Map<String, String> workflowIdMap;
- this.helixManager.connect();
+ new
NewJobConfigArrivalEvent(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties);
+ properties.setProperty(ConfigurationKeys.JOB_ID_KEY,
+ "job_" + properties.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
suffix);
+ return newJobConfigArrivalEvent;
+ }
- String workFlowId = null;
- long endTime = System.currentTimeMillis() + 30000;
- while (System.currentTimeMillis() < endTime) {
- workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
- Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
- if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
- workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
- break;
- }
- Thread.sleep(100);
- }
+ private void connectAndAssertWorkflowId(String expectedSuffix,
NewJobConfigArrivalEvent newJobConfigArrivalEvent, HelixManager helixManager )
throws Exception {
+ helixManager.connect();
+ String workFlowId = getWorkflowID(newJobConfigArrivalEvent, helixManager);
Assert.assertNotNull(workFlowId);
- Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+ Assert.assertTrue(workFlowId.endsWith(expectedSuffix));
+ }
- jobScheduler.handleUpdateJobConfigArrival(
- new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
- this.helixManager.connect();
- endTime = System.currentTimeMillis() + 30000;
+ private String getWorkflowID (NewJobConfigArrivalEvent
newJobConfigArrivalEvent, HelixManager helixManager )
+ throws Exception {
+ long endTime = System.currentTimeMillis() + 30000;
Review Comment:
why 30000 here, not obvious? add comment to explain what end time is being
created
##########
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java:
##########
@@ -132,57 +143,260 @@ public void setUp()
@Test
public void testNewJobAndUpdate()
throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "NewJobAndUpdate", workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+
+ connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowShortPeriodThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateSameWorkflowShortPeriodThrottle",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateSameWorkflowLongPeriodNoThrottle",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"false");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+ connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateSameWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateSameWorkflowShortPeriodNoThrottle",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+
properties1.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"false");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+ connectAndAssertWorkflowId(workflowIdSuffix2, newJobConfigArrivalEvent,
helixManager);
+ }
+ }
+
+ public void testUpdateDiffWorkflowShortPeriodThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle1",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+ final Properties properties2 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowShortPeriodThrottle2",
workflowIdSuffix3);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+ new
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2);
+
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2));
+ connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateDiffWorkflowShortPeriodNoThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, shortPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle1",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+ final Properties properties2 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowShortPeriodNoThrottle2",
workflowIdSuffix3);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+ new
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2);
+
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"false");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2));
+ connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle1",
workflowIdSuffix1);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
createJobConfigArrivalEvent(properties1, workflowIdSuffix2);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ connectAndAssertWorkflowId(workflowIdSuffix1, newJobConfigArrivalEvent,
helixManager);
+
+ final Properties properties2 =
+ GobblinHelixJobLauncherTest.generateJobProperties(
+ this.baseConfig, "UpdateDiffWorkflowLongPeriodThrottle2",
workflowIdSuffix3);
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent2 =
+ new
NewJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2);
+
properties2.setProperty(GobblinClusterConfigurationKeys.HELIX_JOB_SCHEDULING_THROTTLE_ENABLED_KEY,
"true");
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties2));
+ connectAndAssertWorkflowId(workflowIdSuffix3, newJobConfigArrivalEvent2,
helixManager);
+ }
+ }
+
+ @Test
+ public void testUpdateDiffWorkflowLongPeriodNoThrottle()
+ throws Exception {
+ try (MockedStatic<Instant> mocked = mockStatic(Instant.class,
CALLS_REAL_METHODS)) {
+ mocked.when(Instant::now).thenReturn(beginTime, longPeriod);
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ GobblinHelixJobScheduler jobScheduler = createJobScheduler(helixManager);
Review Comment:
same here u can reuse perhaps, lots of repeated code. Let's try to DRY (do
not repeat yourself)
Issue Time Tracking
-------------------
Worklog Id: (was: 865665)
Remaining Estimate: 0h
Time Spent: 10m
> Helix Job scheduler should not try to replace running workflow if within
> configured time
> ----------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1840
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1840
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Matthew Ho
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)