This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f9dd3cf6 [GOBBLIN-2108] fix quartz not able to create non static 
inner class, create a separa… (#3998)
4f9dd3cf6 is described below

commit 4f9dd3cf665ec9a87edbd0f14baa8894e9a66b3c
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Jul 15 08:52:09 2024 -0700

    [GOBBLIN-2108] fix quartz not able to create non static inner class, create 
a separa… (#3998)
    
    * fix quartz not able to create non static inner class, create a separate 
scheduler for reminder jobs
---
 .../apache/gobblin/scheduler/SchedulerService.java |  36 ++-----
 .../modules/core/GobblinServiceGuiceModule.java    |   6 +-
 .../orchestration/DagActionReminderScheduler.java  | 103 +++++++++++++--------
 .../DagActionReminderSchedulerTest.java            |  63 ++++++++++++-
 4 files changed, 133 insertions(+), 75 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/SchedulerService.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/SchedulerService.java
index c19cf22d0..fbbbb614f 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/SchedulerService.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/SchedulerService.java
@@ -21,7 +21,6 @@ import java.util.Properties;
 
 import org.quartz.Scheduler;
 import org.quartz.SchedulerException;
-import org.quartz.SchedulerFactory;
 import org.quartz.impl.StdSchedulerFactory;
 
 import com.google.common.base.Optional;
@@ -44,52 +43,35 @@ import org.apache.gobblin.util.PropertiesUtils;
 @Singleton
 public class SchedulerService extends AbstractIdleService {
 
-  private SchedulerFactory schedulerFactory;
-  // Refers to traditional job scheduler
   @Getter
   private Scheduler scheduler;
   private final boolean waitForJobCompletion;
   private final Optional<Properties> quartzProps;
 
-  public SchedulerService(boolean waitForJobCompletion, Optional<Properties> 
quartzConfig,
-      StdSchedulerFactory schedulerFactory) {
+  public SchedulerService(boolean waitForJobCompletion, Optional<Properties> 
quartzConfig) {
     this.waitForJobCompletion = waitForJobCompletion;
     this.quartzProps = quartzConfig;
-    if (this.schedulerFactory == null) {
-      this.schedulerFactory = new StdSchedulerFactory();
-    }
-    else {
-      this.schedulerFactory = schedulerFactory;
-    }
   }
 
-  public SchedulerService(Properties props, StdSchedulerFactory 
schedulerFactory) {
+  public SchedulerService(Properties props) {
     this(Boolean.parseBoolean(
             
props.getProperty(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY,
-                              
ConfigurationKeys.DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION)),
-        Optional.of(PropertiesUtils.extractPropertiesWithPrefix(props, 
Optional.of("org.quartz."))), schedulerFactory);
-  }
-
-  public SchedulerService(Properties props) {
-    this(props, null);
-  }
-
-  public SchedulerService(Config cfg) {
-    this(cfg, null);
+                ConfigurationKeys.DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION)),
+        Optional.of(PropertiesUtils.extractPropertiesWithPrefix(props, 
Optional.of("org.quartz."))));
   }
 
   @Inject
-  public SchedulerService(Config cfg, StdSchedulerFactory schedulerFactory) {
+  public SchedulerService(Config cfg) {
     this(cfg.hasPath(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY) ?
             
cfg.getBoolean(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY) :
             
Boolean.parseBoolean(ConfigurationKeys.DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION),
-        Optional.of(ConfigUtils.configToProperties(cfg, "org.quartz.")), 
schedulerFactory);
+        Optional.of(ConfigUtils.configToProperties(cfg, "org.quartz.")));
   }
 
   @Override protected void startUp() throws SchedulerException  {
-    if (this.quartzProps.isPresent() && this.quartzProps.get().size() > 0) {
-      // Cast to StdSchedulerFactory to reference initialization method that 
generic interface does not provide
-      ((StdSchedulerFactory) 
schedulerFactory).initialize(this.quartzProps.get());
+    StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
+    if (this.quartzProps.isPresent() && !this.quartzProps.get().isEmpty()) {
+      schedulerFactory.initialize(this.quartzProps.get());
     }
     this.scheduler = schedulerFactory.getScheduler();
     this.scheduler.start();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index de53c8323..6365239ec 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.core;
 import java.util.Objects;
 
 import org.apache.helix.HelixManager;
-import org.quartz.impl.StdSchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,8 +76,8 @@ import 
org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
 import org.apache.gobblin.service.modules.orchestration.DagTaskStream;
 import org.apache.gobblin.service.modules.orchestration.FlowLaunchHandler;
 import 
org.apache.gobblin.service.modules.orchestration.FlowLaunchMultiActiveLeaseArbiterFactory;
-import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
 import 
org.apache.gobblin.service.modules.orchestration.MultiActiveLeaseArbiter;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
@@ -200,9 +199,6 @@ public class GobblinServiceGuiceModule implements Module {
       binder.bind(FlowLaunchHandler.class);
     }
 
-    // Note: only one SchedulerFactory instance should exist per JVM
-    binder.bind(StdSchedulerFactory.class).in(Singleton.class);
-
     OptionalBinder.newOptionalBinder(binder, DagManagement.class);
     OptionalBinder.newOptionalBinder(binder, DagTaskStream.class);
     OptionalBinder.newOptionalBinder(binder, DagManagementStateStore.class);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index 7109f343e..63ebe9cd9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
 import java.util.Date;
+import java.util.Properties;
 import java.util.function.Supplier;
 
 import org.quartz.Job;
@@ -33,9 +34,14 @@ import org.quartz.Trigger;
 import org.quartz.TriggerBuilder;
 import org.quartz.TriggerKey;
 import org.quartz.impl.StdSchedulerFactory;
+import org.quartz.spi.JobFactory;
+import org.quartz.spi.TriggerFiredBundle;
+
+import com.google.common.annotations.VisibleForTesting;
 
 import javax.inject.Inject;
 import javax.inject.Singleton;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -56,19 +62,29 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 @Slf4j
 @Singleton
 public class DagActionReminderScheduler {
+  public static final String DAG_ACTION_REMINDER_SCHEDULER_NAME = 
"DagActionReminderScheduler";
   public static final String RetryReminderKeyGroup = "RetryReminder";
   public static final String DeadlineReminderKeyGroup = "DeadlineReminder";
-  private final Scheduler quartzScheduler;
+  @VisibleForTesting
+  final Scheduler quartzScheduler;
   private final DagManagement dagManagement;
 
   @Inject
-  public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory, 
DagManagement dagManagement)
-      throws SchedulerException {
+  public DagActionReminderScheduler(DagManagement dagManagement) throws 
SchedulerException {
     // Creates a new Scheduler to be used solely for the DagProc reminders
-    this.quartzScheduler = schedulerFactory.getScheduler();
+    this.quartzScheduler = createScheduler();
+    this.quartzScheduler.start();
+    this.quartzScheduler.setJobFactory(new ReminderJobFactory());
     this.dagManagement = dagManagement;
   }
 
+  private Scheduler createScheduler() throws SchedulerException {
+    Properties properties = new Properties();
+    properties.setProperty("org.quartz.scheduler.instanceName", 
DAG_ACTION_REMINDER_SCHEDULER_NAME);
+    properties.setProperty("org.quartz.threadPool.threadCount", "10");
+    return new StdSchedulerFactory(properties).getScheduler();
+  }
+
   /**
    *  Uses a dagAction & reminder duration in milliseconds to create a 
reminder job that will fire
    *  `reminderDurationMillis` after the current time
@@ -94,46 +110,13 @@ public class DagActionReminderScheduler {
     }
   }
 
-  /**
-   * Static class used to store information regarding a pending dagAction that 
needs to be revisited at a later time
-   * by {@link DagManagement} interface to re-attempt a lease on if it has not 
been completed by the previous owner.
-   * These jobs are scheduled and used by the {@link 
DagActionReminderScheduler}.
-   */
-  public class ReminderJob implements Job {
-    public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
-    public static final String FLOW_ACTION_EVENT_TIME_KEY = "flow.eventTime";
-
-    @Override
-    public void execute(JobExecutionContext context) {
-      // Get properties from the trigger to create a dagAction
-      JobDataMap jobDataMap = context.getMergedJobDataMap();
-      String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
-      String flowGroup = 
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
-      String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
-      long flowExecutionId = 
jobDataMap.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
-      DagActionStore.DagActionType dagActionType = 
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
-      long eventTimeMillis = jobDataMap.getLong(FLOW_ACTION_EVENT_TIME_KEY);
-
-      DagActionStore.LeaseParams reminderLeaseParams = new 
DagActionStore.LeaseParams(
-          new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, dagActionType),
-          true, eventTimeMillis);
-      log.info("DagProc reminder triggered for dagAction event: {}", 
reminderLeaseParams);
-
-      try {
-        dagManagement.addDagAction(reminderLeaseParams);
-      } catch (IOException e) {
-        log.error("Failed to add DagAction event to DagManagement. dagAction 
event: {}", reminderLeaseParams);
-      }
-    }
-  }
-
   /**
    * Creates a key for the reminder job by concatenating all dagAction fields 
and the eventTime of the dagAction.
-   *
+   * <p>
    * This ensures unique keys for multiple instances of the same action on the 
same flow execution that originate more
    * than 'epsilon' apart. {@link MultiActiveLeaseArbiter} uses the eventTime 
to distinguish these distinct occurrences
    * of the same action. This is necessary to prevent insertion failures due 
to previous reminders.
-   *
+   * <p>
    * Applicable only for KILL and RESUME actions; duplication for other 
actions is an error.
    */
   public static String createDagActionReminderKey(DagActionStore.LeaseParams 
leaseParams) {
@@ -192,4 +175,46 @@ public class DagActionReminderScheduler {
         .startAt(new Date(getCurrentTimeMillis.get() + reminderDurationMillis))
         .build();
   }
+
+  public class ReminderJobFactory implements JobFactory {
+    @Override
+    public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) {
+      return new ReminderJob(dagManagement);
+    }
+  }
+
+  /**
+   * These jobs are scheduled and used by the {@link 
DagActionReminderScheduler}.
+   * When the reminder deadline is completed, these jobs are invoked by Quartz 
scheduler.
+   * They create a {@link DagActionStore.LeaseParams} and forward them to 
{@link DagManagement} for further processing.
+   */
+  @RequiredArgsConstructor
+  public static class ReminderJob implements Job {
+    public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
+    public static final String FLOW_ACTION_EVENT_TIME_KEY = "flow.eventTime";
+    private final DagManagement dagManagement;
+
+    @Override
+    public void execute(JobExecutionContext context) {
+      // Get properties from the trigger to create a dagAction
+      JobDataMap jobDataMap = context.getMergedJobDataMap();
+      String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
+      String flowGroup = 
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+      String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
+      long flowExecutionId = 
jobDataMap.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+      DagActionStore.DagActionType dagActionType = 
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
+      long eventTimeMillis = jobDataMap.getLong(FLOW_ACTION_EVENT_TIME_KEY);
+
+      DagActionStore.LeaseParams reminderLeaseParams = new 
DagActionStore.LeaseParams(
+          new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, dagActionType),
+          true, eventTimeMillis);
+      log.info("DagProc reminder triggered for dagAction event: {}", 
reminderLeaseParams);
+
+      try {
+        dagManagement.addDagAction(reminderLeaseParams);
+      } catch (IOException e) {
+        log.error("Failed to add DagAction event to DagManagement. dagAction 
event: {}", reminderLeaseParams);
+      }
+    }
+  }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
index 623dd7ae1..cb519c556 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
@@ -17,14 +17,21 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import java.io.IOException;
 import java.util.Date;
 import java.util.List;
 import java.util.function.Supplier;
 
+import org.mockito.Mockito;
+import org.quartz.Job;
+import org.quartz.JobBuilder;
 import org.quartz.JobDataMap;
 import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.Scheduler;
 import org.quartz.SchedulerException;
 import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
 import org.quartz.TriggerUtils;
 import org.quartz.impl.StdSchedulerFactory;
 import org.quartz.spi.OperableTrigger;
@@ -57,14 +64,13 @@ public class DagActionReminderSchedulerTest {
   DagActionStore.LeaseParams launchLeaseParams = new 
DagActionStore.LeaseParams(launchDagAction, eventTimeMillis);
   DagActionStore.LeaseParams launchLeaseParams2 = new 
DagActionStore.LeaseParams(launchDagAction, eventTimeMillis2);
   DagActionReminderScheduler dagActionReminderScheduler;
+  DagManagement dagManagement = mock(DagManagement.class);
+  private static boolean testJobRan = false;
 
   @BeforeClass
   private void setup() throws Exception {
-    StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
-    schedulerFactory.getScheduler();
-    DagManagement dagManagement = mock(DagManagement.class);
     doNothing().when(dagManagement).addDagAction(any());
-    this.dagActionReminderScheduler = new 
DagActionReminderScheduler(schedulerFactory, dagManagement);
+    this.dagActionReminderScheduler = new 
DagActionReminderScheduler(this.dagManagement);
   }
 
   @Test
@@ -109,4 +115,53 @@ public class DagActionReminderSchedulerTest {
     this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams, 
true);
     this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams2, 
true);
   }
+
+  // Test multiple schedulers can co-exist and run their jobs of different 
types
+  @Test
+  public void testMultipleSchedules() throws SchedulerException, 
InterruptedException, IOException {
+    JobDetail jobDetail = 
DagActionReminderScheduler.createReminderJobDetail(launchLeaseParams, false);
+    Scheduler scheduler1 = this.dagActionReminderScheduler.quartzScheduler;
+    Scheduler scheduler2 = new StdSchedulerFactory().getScheduler();
+
+    Assert.assertNotSame(scheduler1, scheduler2);
+
+    this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams, 100L, 
false);
+
+    
Assert.assertTrue(dagActionReminderScheduler.quartzScheduler.checkExists(jobDetail.getKey()));
+
+    Thread.sleep(200L);
+
+    // verify that the quartz job ran
+    Mockito.verify(this.dagManagement, Mockito.times(1)).addDagAction(any());
+    // verify that the quartz job cleaned itself without throwing any exception
+    
Assert.assertFalse(dagActionReminderScheduler.quartzScheduler.checkExists(jobDetail.getKey()));
+
+    scheduler2.start();
+
+    JobDetail job = JobBuilder.newJob(TestJob.class)
+        .withIdentity("myJob", "group1")
+        .build();
+
+    Trigger trigger = TriggerBuilder.newTrigger()
+        .withIdentity("myTrigger", "group1")
+        .startAt(new Date(System.currentTimeMillis() + 100L))
+        .build();
+
+    scheduler2.scheduleJob(job, trigger);
+
+    Assert.assertTrue(scheduler2.checkExists(job.getKey()));
+    Thread.sleep(200L);
+
+    // verify that the quartz job ran
+    Assert.assertTrue(DagActionReminderSchedulerTest.testJobRan);
+    // verify that the quartz job cleaned itself without throwing any exception
+    Assert.assertFalse(scheduler2.checkExists(jobDetail.getKey()));
+  }
+
+  public static class TestJob implements Job {
+    @Override
+    public void execute(JobExecutionContext context) {
+      DagActionReminderSchedulerTest.testJobRan = true;
+    }
+  }
 }

Reply via email to