prateekm commented on a change in pull request #1434:
URL: https://github.com/apache/samza/pull/1434#discussion_r503535544



##########
File path: 
samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
##########
@@ -67,8 +73,12 @@ public void start() {
           int monitorSchedulingJitterInMs = (int) 
(RANDOM.nextInt(schedulingIntervalInMs + 1) * 
(monitorConfig.getSchedulingJitterPercent() / 100.0));
           schedulingIntervalInMs += monitorSchedulingJitterInMs;
           LOGGER.info("Scheduling the monitor: {} to run every {} ms.", 
monitorName, schedulingIntervalInMs);
-          scheduler.schedule(getRunnable(instantiateMonitor(monitorName, 
monitorConfig, metricsRegistry)),
-              schedulingIntervalInMs);
+          // Create a new SchedulerExecutorService for each monitor. This 
ensures that a long running monitor service
+          // does not block another monitor from scheduling/running. A long 
running monitor will not create a backlog
+          // of work for future monitors of same type. A new monitor is 
scheduled only when current work is complete.

Review comment:
       "for future executions of the same monitor. New execution is .." 

##########
File path: 
samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
##########
@@ -43,16 +51,14 @@
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SamzaMonitorService.class);
   private static final SecureRandom RANDOM = new SecureRandom();
 
-  private final SchedulingProvider scheduler;

Review comment:
       If this is the only place this interface was being used, let's delete 
the interface + impl.

##########
File path: 
samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
##########
@@ -43,16 +51,14 @@
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SamzaMonitorService.class);
   private static final SecureRandom RANDOM = new SecureRandom();
 
-  private final SchedulingProvider scheduler;
   private final SamzaRestConfig config;
   private final MetricsRegistry metricsRegistry;
-
+  private final List<ScheduledExecutorService> scheduledExecutors;
   public SamzaMonitorService(SamzaRestConfig config,
-                             MetricsRegistry metricsRegistry,
-                             SchedulingProvider schedulingProvider) {
+      MetricsRegistry metricsRegistry) {
     this.config = config;
     this.metricsRegistry = metricsRegistry;
-    this.scheduler = schedulingProvider;
+    scheduledExecutors = new ArrayList<>();

Review comment:
       Where is this list updated?

##########
File path: 
samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
##########
@@ -43,16 +51,14 @@
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SamzaMonitorService.class);
   private static final SecureRandom RANDOM = new SecureRandom();
 
-  private final SchedulingProvider scheduler;
   private final SamzaRestConfig config;
   private final MetricsRegistry metricsRegistry;
-
+  private final List<ScheduledExecutorService> scheduledExecutors;
   public SamzaMonitorService(SamzaRestConfig config,
-                             MetricsRegistry metricsRegistry,
-                             SchedulingProvider schedulingProvider) {
+      MetricsRegistry metricsRegistry) {
     this.config = config;
     this.metricsRegistry = metricsRegistry;
-    this.scheduler = schedulingProvider;
+    scheduledExecutors = new ArrayList<>();

Review comment:
       this.scheduledExecutors for consistency.

##########
File path: 
samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
##########
@@ -67,8 +73,12 @@ public void start() {
           int monitorSchedulingJitterInMs = (int) 
(RANDOM.nextInt(schedulingIntervalInMs + 1) * 
(monitorConfig.getSchedulingJitterPercent() / 100.0));
           schedulingIntervalInMs += monitorSchedulingJitterInMs;
           LOGGER.info("Scheduling the monitor: {} to run every {} ms.", 
monitorName, schedulingIntervalInMs);
-          scheduler.schedule(getRunnable(instantiateMonitor(monitorName, 
monitorConfig, metricsRegistry)),
-              schedulingIntervalInMs);
+          // Create a new SchedulerExecutorService for each monitor. This 
ensures that a long running monitor service
+          // does not block another monitor from scheduling/running. A long 
running monitor will not create a backlog
+          // of work for future monitors of same type. A new monitor is 
scheduled only when current work is complete.
+          getScheduler()

Review comment:
       s/getScheduler/createScheduler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to