This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4f9dd3cf6 [GOBBLIN-2108] fix quartz not able to create non static
inner class, create a separa… (#3998)
4f9dd3cf6 is described below
commit 4f9dd3cf665ec9a87edbd0f14baa8894e9a66b3c
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Jul 15 08:52:09 2024 -0700
[GOBBLIN-2108] fix quartz not able to create non static inner class, create
a separa… (#3998)
* fix quartz not able to create non static inner class, create a separate
scheduler for reminder jobs
---
.../apache/gobblin/scheduler/SchedulerService.java | 36 ++-----
.../modules/core/GobblinServiceGuiceModule.java | 6 +-
.../orchestration/DagActionReminderScheduler.java | 103 +++++++++++++--------
.../DagActionReminderSchedulerTest.java | 63 ++++++++++++-
4 files changed, 133 insertions(+), 75 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/SchedulerService.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/SchedulerService.java
index c19cf22d0..fbbbb614f 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/SchedulerService.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/SchedulerService.java
@@ -21,7 +21,6 @@ import java.util.Properties;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
-import org.quartz.SchedulerFactory;
import org.quartz.impl.StdSchedulerFactory;
import com.google.common.base.Optional;
@@ -44,52 +43,35 @@ import org.apache.gobblin.util.PropertiesUtils;
@Singleton
public class SchedulerService extends AbstractIdleService {
- private SchedulerFactory schedulerFactory;
- // Refers to traditional job scheduler
@Getter
private Scheduler scheduler;
private final boolean waitForJobCompletion;
private final Optional<Properties> quartzProps;
- public SchedulerService(boolean waitForJobCompletion, Optional<Properties>
quartzConfig,
- StdSchedulerFactory schedulerFactory) {
+ public SchedulerService(boolean waitForJobCompletion, Optional<Properties>
quartzConfig) {
this.waitForJobCompletion = waitForJobCompletion;
this.quartzProps = quartzConfig;
- if (this.schedulerFactory == null) {
- this.schedulerFactory = new StdSchedulerFactory();
- }
- else {
- this.schedulerFactory = schedulerFactory;
- }
}
- public SchedulerService(Properties props, StdSchedulerFactory
schedulerFactory) {
+ public SchedulerService(Properties props) {
this(Boolean.parseBoolean(
props.getProperty(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY,
-
ConfigurationKeys.DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION)),
- Optional.of(PropertiesUtils.extractPropertiesWithPrefix(props,
Optional.of("org.quartz."))), schedulerFactory);
- }
-
- public SchedulerService(Properties props) {
- this(props, null);
- }
-
- public SchedulerService(Config cfg) {
- this(cfg, null);
+ ConfigurationKeys.DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION)),
+ Optional.of(PropertiesUtils.extractPropertiesWithPrefix(props,
Optional.of("org.quartz."))));
}
@Inject
- public SchedulerService(Config cfg, StdSchedulerFactory schedulerFactory) {
+ public SchedulerService(Config cfg) {
this(cfg.hasPath(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY) ?
cfg.getBoolean(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY) :
Boolean.parseBoolean(ConfigurationKeys.DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION),
- Optional.of(ConfigUtils.configToProperties(cfg, "org.quartz.")),
schedulerFactory);
+ Optional.of(ConfigUtils.configToProperties(cfg, "org.quartz.")));
}
@Override protected void startUp() throws SchedulerException {
- if (this.quartzProps.isPresent() && this.quartzProps.get().size() > 0) {
- // Cast to StdSchedulerFactory to reference initialization method that
generic interface does not provide
- ((StdSchedulerFactory)
schedulerFactory).initialize(this.quartzProps.get());
+ StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
+ if (this.quartzProps.isPresent() && !this.quartzProps.get().isEmpty()) {
+ schedulerFactory.initialize(this.quartzProps.get());
}
this.scheduler = schedulerFactory.getScheduler();
this.scheduler.start();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index de53c8323..6365239ec 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.core;
import java.util.Objects;
import org.apache.helix.HelixManager;
-import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,8 +76,8 @@ import
org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
import org.apache.gobblin.service.modules.orchestration.DagTaskStream;
import org.apache.gobblin.service.modules.orchestration.FlowLaunchHandler;
import
org.apache.gobblin.service.modules.orchestration.FlowLaunchMultiActiveLeaseArbiterFactory;
-import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
import
org.apache.gobblin.service.modules.orchestration.MultiActiveLeaseArbiter;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
@@ -200,9 +199,6 @@ public class GobblinServiceGuiceModule implements Module {
binder.bind(FlowLaunchHandler.class);
}
- // Note: only one SchedulerFactory instance should exist per JVM
- binder.bind(StdSchedulerFactory.class).in(Singleton.class);
-
OptionalBinder.newOptionalBinder(binder, DagManagement.class);
OptionalBinder.newOptionalBinder(binder, DagTaskStream.class);
OptionalBinder.newOptionalBinder(binder, DagManagementStateStore.class);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index 7109f343e..63ebe9cd9 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
import java.util.Date;
+import java.util.Properties;
import java.util.function.Supplier;
import org.quartz.Job;
@@ -33,9 +34,14 @@ import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
+import org.quartz.spi.JobFactory;
+import org.quartz.spi.TriggerFiredBundle;
+
+import com.google.common.annotations.VisibleForTesting;
import javax.inject.Inject;
import javax.inject.Singleton;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -56,19 +62,29 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
@Slf4j
@Singleton
public class DagActionReminderScheduler {
+ public static final String DAG_ACTION_REMINDER_SCHEDULER_NAME =
"DagActionReminderScheduler";
public static final String RetryReminderKeyGroup = "RetryReminder";
public static final String DeadlineReminderKeyGroup = "DeadlineReminder";
- private final Scheduler quartzScheduler;
+ @VisibleForTesting
+ final Scheduler quartzScheduler;
private final DagManagement dagManagement;
@Inject
- public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory,
DagManagement dagManagement)
- throws SchedulerException {
+ public DagActionReminderScheduler(DagManagement dagManagement) throws
SchedulerException {
// Creates a new Scheduler to be used solely for the DagProc reminders
- this.quartzScheduler = schedulerFactory.getScheduler();
+ this.quartzScheduler = createScheduler();
+ this.quartzScheduler.start();
+ this.quartzScheduler.setJobFactory(new ReminderJobFactory());
this.dagManagement = dagManagement;
}
+ private Scheduler createScheduler() throws SchedulerException {
+ Properties properties = new Properties();
+ properties.setProperty("org.quartz.scheduler.instanceName",
DAG_ACTION_REMINDER_SCHEDULER_NAME);
+ properties.setProperty("org.quartz.threadPool.threadCount", "10");
+ return new StdSchedulerFactory(properties).getScheduler();
+ }
+
/**
* Uses a dagAction & reminder duration in milliseconds to create a
reminder job that will fire
* `reminderDurationMillis` after the current time
@@ -94,46 +110,13 @@ public class DagActionReminderScheduler {
}
}
- /**
- * Static class used to store information regarding a pending dagAction that
needs to be revisited at a later time
- * by {@link DagManagement} interface to re-attempt a lease on if it has not
been completed by the previous owner.
- * These jobs are scheduled and used by the {@link
DagActionReminderScheduler}.
- */
- public class ReminderJob implements Job {
- public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
- public static final String FLOW_ACTION_EVENT_TIME_KEY = "flow.eventTime";
-
- @Override
- public void execute(JobExecutionContext context) {
- // Get properties from the trigger to create a dagAction
- JobDataMap jobDataMap = context.getMergedJobDataMap();
- String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
- String flowGroup =
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
- String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
- long flowExecutionId =
jobDataMap.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
- DagActionStore.DagActionType dagActionType =
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
- long eventTimeMillis = jobDataMap.getLong(FLOW_ACTION_EVENT_TIME_KEY);
-
- DagActionStore.LeaseParams reminderLeaseParams = new
DagActionStore.LeaseParams(
- new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, dagActionType),
- true, eventTimeMillis);
- log.info("DagProc reminder triggered for dagAction event: {}",
reminderLeaseParams);
-
- try {
- dagManagement.addDagAction(reminderLeaseParams);
- } catch (IOException e) {
- log.error("Failed to add DagAction event to DagManagement. dagAction
event: {}", reminderLeaseParams);
- }
- }
- }
-
/**
* Creates a key for the reminder job by concatenating all dagAction fields
and the eventTime of the dagAction.
- *
+ * <p>
* This ensures unique keys for multiple instances of the same action on the
same flow execution that originate more
* than 'epsilon' apart. {@link MultiActiveLeaseArbiter} uses the eventTime
to distinguish these distinct occurrences
* of the same action. This is necessary to prevent insertion failures due
to previous reminders.
- *
+ * <p>
* Applicable only for KILL and RESUME actions; duplication for other
actions is an error.
*/
public static String createDagActionReminderKey(DagActionStore.LeaseParams
leaseParams) {
@@ -192,4 +175,46 @@ public class DagActionReminderScheduler {
.startAt(new Date(getCurrentTimeMillis.get() + reminderDurationMillis))
.build();
}
+
+ public class ReminderJobFactory implements JobFactory {
+ @Override
+ public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) {
+ return new ReminderJob(dagManagement);
+ }
+ }
+
+ /**
+ * These jobs are scheduled and used by the {@link
DagActionReminderScheduler}.
+ * When the reminder deadline is completed, these jobs are invoked by Quartz
scheduler.
+ * They create a {@link DagActionStore.LeaseParams} and forward them to
{@link DagManagement} for further processing.
+ */
+ @RequiredArgsConstructor
+ public static class ReminderJob implements Job {
+ public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
+ public static final String FLOW_ACTION_EVENT_TIME_KEY = "flow.eventTime";
+ private final DagManagement dagManagement;
+
+ @Override
+ public void execute(JobExecutionContext context) {
+ // Get properties from the trigger to create a dagAction
+ JobDataMap jobDataMap = context.getMergedJobDataMap();
+ String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ String flowGroup =
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
+ long flowExecutionId =
jobDataMap.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ DagActionStore.DagActionType dagActionType =
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
+ long eventTimeMillis = jobDataMap.getLong(FLOW_ACTION_EVENT_TIME_KEY);
+
+ DagActionStore.LeaseParams reminderLeaseParams = new
DagActionStore.LeaseParams(
+ new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, dagActionType),
+ true, eventTimeMillis);
+ log.info("DagProc reminder triggered for dagAction event: {}",
reminderLeaseParams);
+
+ try {
+ dagManagement.addDagAction(reminderLeaseParams);
+ } catch (IOException e) {
+ log.error("Failed to add DagAction event to DagManagement. dagAction
event: {}", reminderLeaseParams);
+ }
+ }
+ }
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
index 623dd7ae1..cb519c556 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
@@ -17,14 +17,21 @@
package org.apache.gobblin.service.modules.orchestration;
+import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.function.Supplier;
+import org.mockito.Mockito;
+import org.quartz.Job;
+import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
import org.quartz.TriggerUtils;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.spi.OperableTrigger;
@@ -57,14 +64,13 @@ public class DagActionReminderSchedulerTest {
DagActionStore.LeaseParams launchLeaseParams = new
DagActionStore.LeaseParams(launchDagAction, eventTimeMillis);
DagActionStore.LeaseParams launchLeaseParams2 = new
DagActionStore.LeaseParams(launchDagAction, eventTimeMillis2);
DagActionReminderScheduler dagActionReminderScheduler;
+ DagManagement dagManagement = mock(DagManagement.class);
+ private static boolean testJobRan = false;
@BeforeClass
private void setup() throws Exception {
- StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
- schedulerFactory.getScheduler();
- DagManagement dagManagement = mock(DagManagement.class);
doNothing().when(dagManagement).addDagAction(any());
- this.dagActionReminderScheduler = new
DagActionReminderScheduler(schedulerFactory, dagManagement);
+ this.dagActionReminderScheduler = new
DagActionReminderScheduler(this.dagManagement);
}
@Test
@@ -109,4 +115,53 @@ public class DagActionReminderSchedulerTest {
this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams,
true);
this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams2,
true);
}
+
+ // Test multiple schedulers can co-exist and run their jobs of different
types
+ @Test
+ public void testMultipleSchedules() throws SchedulerException,
InterruptedException, IOException {
+ JobDetail jobDetail =
DagActionReminderScheduler.createReminderJobDetail(launchLeaseParams, false);
+ Scheduler scheduler1 = this.dagActionReminderScheduler.quartzScheduler;
+ Scheduler scheduler2 = new StdSchedulerFactory().getScheduler();
+
+ Assert.assertNotSame(scheduler1, scheduler2);
+
+ this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams, 100L,
false);
+
+
Assert.assertTrue(dagActionReminderScheduler.quartzScheduler.checkExists(jobDetail.getKey()));
+
+ Thread.sleep(200L);
+
+ // verify that the quartz job ran
+ Mockito.verify(this.dagManagement, Mockito.times(1)).addDagAction(any());
+ // verify that the quartz job cleaned itself without throwing any exception
+
Assert.assertFalse(dagActionReminderScheduler.quartzScheduler.checkExists(jobDetail.getKey()));
+
+ scheduler2.start();
+
+ JobDetail job = JobBuilder.newJob(TestJob.class)
+ .withIdentity("myJob", "group1")
+ .build();
+
+ Trigger trigger = TriggerBuilder.newTrigger()
+ .withIdentity("myTrigger", "group1")
+ .startAt(new Date(System.currentTimeMillis() + 100L))
+ .build();
+
+ scheduler2.scheduleJob(job, trigger);
+
+ Assert.assertTrue(scheduler2.checkExists(job.getKey()));
+ Thread.sleep(200L);
+
+ // verify that the quartz job ran
+ Assert.assertTrue(DagActionReminderSchedulerTest.testJobRan);
+ // verify that the quartz job cleaned itself without throwing any exception
+ Assert.assertFalse(scheduler2.checkExists(jobDetail.getKey()));
+ }
+
+ public static class TestJob implements Job {
+ @Override
+ public void execute(JobExecutionContext context) {
+ DagActionReminderSchedulerTest.testJobRan = true;
+ }
+ }
}