This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 13b5303  SAMZA-2595: Updated MonitorService to use separate 
ScheduleExecutor for each monitor (#1434)
13b5303 is described below

commit 13b5303ae1ee76a58d2ebe1dfd2c1df476e7e8b5
Author: shekhars-li <[email protected]>
AuthorDate: Mon Oct 12 18:55:23 2020 -0700

    SAMZA-2595: Updated MonitorService to use separate ScheduleExecutor for 
each monitor (#1434)
    
    Co-authored-by: Shekhar Sharma <[email protected]>
---
 .../org/apache/samza/monitor/MonitorConfig.java    |  6 ++-
 .../apache/samza/monitor/SamzaMonitorService.java  | 43 ++++++++++++++++++----
 .../ScheduledExecutorSchedulingProvider.java       | 40 --------------------
 .../apache/samza/monitor/SchedulingProvider.java   | 30 ---------------
 .../org/apache/samza/rest/SamzaRestService.java    | 22 ++---------
 .../apache/samza/monitor/TestMonitorService.java   | 36 +++++++++++++-----
 .../monitor/mock/InstantSchedulingProvider.java    | 34 -----------------
 7 files changed, 70 insertions(+), 141 deletions(-)

diff --git 
a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorConfig.java 
b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorConfig.java
index ced6fc1..2291258 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorConfig.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorConfig.java
@@ -44,7 +44,11 @@ public class MonitorConfig extends MapConfig {
 
   private static final String CONFIG_SCHEDULING_JITTER_PERCENT = 
"scheduling.jitter.percent";
 
-  private static final int DEFAULT_SCHEDULING_JITTER_PERCENT = 0;
+  // Default scheduling jitter set to 100 to ensure every monitor is scheduled 
with an initial jitter.
+  // The change SAMZA-2595 creates a new scheduler/thread for every monitor. 
Jitter ensures that there is a delay in
+  // scheduling of every monitor to ensure all the monitors do not start at 
the same time, to avoid spike on host
+  // and to not overwhelm the metric reporting service.
+  private static final int DEFAULT_SCHEDULING_JITTER_PERCENT = 100;
 
   public MonitorConfig(Config config) {
     super(config);
diff --git 
a/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java 
b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
index 5558917..6d84143 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
@@ -18,9 +18,18 @@
  */
 package org.apache.samza.monitor;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import org.apache.samza.SamzaException;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.rest.SamzaRestConfig;
@@ -43,16 +52,14 @@ public class SamzaMonitorService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SamzaMonitorService.class);
   private static final SecureRandom RANDOM = new SecureRandom();
 
-  private final SchedulingProvider scheduler;
   private final SamzaRestConfig config;
   private final MetricsRegistry metricsRegistry;
-
+  private final List<ScheduledExecutorService> scheduledExecutors;
   public SamzaMonitorService(SamzaRestConfig config,
-                             MetricsRegistry metricsRegistry,
-                             SchedulingProvider schedulingProvider) {
+      MetricsRegistry metricsRegistry) {
     this.config = config;
     this.metricsRegistry = metricsRegistry;
-    this.scheduler = schedulingProvider;
+    this.scheduledExecutors = new ArrayList<>();
   }
 
   public void start() {
@@ -67,8 +74,10 @@ public class SamzaMonitorService {
           int monitorSchedulingJitterInMs = (int) 
(RANDOM.nextInt(schedulingIntervalInMs + 1) * 
(monitorConfig.getSchedulingJitterPercent() / 100.0));
           schedulingIntervalInMs += monitorSchedulingJitterInMs;
           LOGGER.info("Scheduling the monitor: {} to run every {} ms.", 
monitorName, schedulingIntervalInMs);
-          scheduler.schedule(getRunnable(instantiateMonitor(monitorName, 
monitorConfig, metricsRegistry)),
-              schedulingIntervalInMs);
+          // Create a new SchedulerExecutorService for each monitor. This 
ensures that a long running monitor service
+          // does not block another monitor from scheduling/running. A long 
running monitor will not create a backlog
+          // of work for future execution of the same monitor. A new monitor 
is scheduled only when current work is complete.
+          createSchedulerAndScheduleMonitor(monitorName, monitorConfig, 
schedulingIntervalInMs);
         } else {
           // When MonitorFactoryClass is not defined in the config, ignore the 
monitor config
           LOGGER.warn("Not scheduling the monitor: {} to run, since monitor 
factory class is not set in config.", monitorName);
@@ -81,7 +90,7 @@ public class SamzaMonitorService {
   }
 
   public void stop() {
-    this.scheduler.stop();
+    scheduledExecutors.forEach(ExecutorService::shutdown);
   }
 
   private Runnable getRunnable(final Monitor monitor) {
@@ -100,4 +109,22 @@ public class SamzaMonitorService {
       }
     };
   }
+
+  /**
+   * Creates a ScheduledThreadPoolExecutor with core pool size 1 and schedules 
the monitor to run every schedulingIntervalInMs
+   */
+  @VisibleForTesting
+  public void createSchedulerAndScheduleMonitor(String monitorName, 
MonitorConfig monitorConfig, long schedulingIntervalInMs)
+      throws InstantiationException {
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("MonitorThread-%d")
+        .build();
+
+    ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(1, threadFactory);
+    scheduledExecutors.add(scheduledExecutorService);
+
+    scheduledExecutorService
+        .scheduleAtFixedRate(getRunnable(instantiateMonitor(monitorName, 
monitorConfig, metricsRegistry)),
+            0, schedulingIntervalInMs, TimeUnit.MILLISECONDS);
+  }
 }
diff --git 
a/samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java
 
b/samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java
deleted file mode 100644
index e1f0711..0000000
--- 
a/samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.monitor;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-public class ScheduledExecutorSchedulingProvider implements SchedulingProvider 
{
-
-  private final ScheduledExecutorService scheduler;
-
-  public ScheduledExecutorSchedulingProvider(ScheduledExecutorService 
scheduler) {
-    this.scheduler = scheduler;
-  }
-
-  public void schedule(Runnable runnable, int interval) {
-    this.scheduler.scheduleAtFixedRate(runnable, 0, interval, MILLISECONDS);
-  }
-
-  public void stop() {
-    this.scheduler.shutdownNow();
-  }
-}
diff --git 
a/samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java 
b/samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java
deleted file mode 100644
index d68f6c1..0000000
--- a/samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.monitor;
-
-/**
- * Provides scheduling functionality to the SamzaMonitorService.
- */
-public interface SchedulingProvider {
-  /* Schedule a the given Runnable to run() every INTERVAL ms. */
-  void schedule(Runnable runnable, int intervalMs);
-
-  /* Stop any future executions of any scheduled tasks. */
-  void stop();
-}
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java 
b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
index c5d6af7..d48d1ab 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
@@ -18,9 +18,7 @@
  */
 package org.apache.samza.rest;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Map;
-import java.util.concurrent.ThreadFactory;
 import joptsimple.OptionSet;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
@@ -29,7 +27,6 @@ import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
 import org.apache.samza.monitor.SamzaMonitorService;
-import org.apache.samza.monitor.ScheduledExecutorSchedulingProvider;
 import org.apache.samza.util.CommandLine;
 import org.apache.samza.util.MetricsReporterLoader;
 import org.apache.samza.util.Util;
@@ -42,8 +39,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.servlet.Servlet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 
 
 /**
@@ -88,7 +83,7 @@ public class SamzaRestService {
    */
   public static void main(String[] args)
       throws Exception {
-    ScheduledExecutorSchedulingProvider schedulingProvider = null;
+    SamzaMonitorService monitorService = null;
     try {
       SamzaRestConfig config = parseConfig(args);
       ReadableMetricsRegistry metricsRegistry = new MetricsRegistryMap();
@@ -104,24 +99,15 @@ public class SamzaRestService {
       ServletContainer container = new ServletContainer(samzaRestApplication);
       restService.addServlet(container, "/*");
 
-      // Schedule monitors to run
-      ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
-                                                              
.setNameFormat("MonitorThread-%d")
-                                                              .build();
-      ScheduledExecutorService schedulingService = 
Executors.newScheduledThreadPool(1, threadFactory);
-      schedulingProvider = new 
ScheduledExecutorSchedulingProvider(schedulingService);
-      SamzaMonitorService monitorService = new SamzaMonitorService(config,
-          metricsRegistry,
-          schedulingProvider);
+      monitorService = new SamzaMonitorService(config, metricsRegistry);
       monitorService.start();
 
       restService.runBlocking();
-      monitorService.stop();
     } catch (Throwable t) {
       log.error("Exception in main.", t);
     } finally {
-      if (schedulingProvider != null) {
-        schedulingProvider.stop();
+      if (monitorService != null) {
+        monitorService.stop();
       }
     }
   }
diff --git 
a/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java 
b/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
index b2924fc..a4ed3d2 100644
--- a/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
+++ b/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
@@ -26,7 +26,6 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.monitor.mock.DummyMonitorFactory;
 import org.apache.samza.monitor.mock.ExceptionThrowingMonitorFactory;
-import org.apache.samza.monitor.mock.InstantSchedulingProvider;
 import org.apache.samza.monitor.mock.MockMonitorFactory;
 import org.apache.samza.rest.SamzaRestConfig;
 import org.apache.samza.util.NoOpMetricsRegistry;
@@ -40,6 +39,7 @@ import org.mockito.Mockito;
 
 import static junit.framework.TestCase.assertTrue;
 import static 
org.apache.samza.monitor.MonitorConfig.CONFIG_MONITOR_FACTORY_CLASS;
+import static org.apache.samza.monitor.MonitorLoader.instantiateMonitor;
 import static org.junit.Assert.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -96,8 +96,7 @@ public class TestMonitorService {
                         
ExceptionThrowingMonitorFactory.class.getCanonicalName());
     SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap));
     SamzaMonitorService monitorService = new SamzaMonitorService(config,
-                                                                 
METRICS_REGISTRY,
-                                                                 new 
InstantSchedulingProvider());
+                                                                 
METRICS_REGISTRY);
 
     // This will throw if the exception isn't caught within the provider.
     monitorService.start();
@@ -115,9 +114,28 @@ public class TestMonitorService {
                                                     
MockMonitorFactory.class.getCanonicalName());
 
     SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap));
-    SamzaMonitorService monitorService = new SamzaMonitorService(config,
-                                                                 
METRICS_REGISTRY,
-                                                                 new 
InstantSchedulingProvider());
+
+    class SamzaMonitorServiceTest extends SamzaMonitorService {
+      MetricsRegistry metricsRegistry;
+      public SamzaMonitorServiceTest(SamzaRestConfig config, MetricsRegistry 
metricsRegistry) {
+        super(config, metricsRegistry);
+        this.metricsRegistry = metricsRegistry;
+      }
+
+      @Override
+      public void createSchedulerAndScheduleMonitor(String monitorName, 
MonitorConfig monitorConfig, long schedulingIntervalInMs) {
+        try {
+          // immediately run monitor, without scheduling
+          instantiateMonitor(monitorName, monitorConfig, 
metricsRegistry).monitor();
+        } catch (Exception e) {
+          fail();
+        }
+      }
+    }
+
+    SamzaMonitorService monitorService = new SamzaMonitorServiceTest(config,
+        METRICS_REGISTRY);
+
     try {
       monitorService.start();
     } catch (Exception e) {
@@ -134,8 +152,7 @@ public class TestMonitorService {
                                                     "RandomClassName");
     SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap));
     SamzaMonitorService monitorService = new SamzaMonitorService(config,
-                                                                 
METRICS_REGISTRY,
-                                                                 new 
InstantSchedulingProvider());
+                                                                 
METRICS_REGISTRY);
     monitorService.start();
   }
 
@@ -143,7 +160,6 @@ public class TestMonitorService {
   public void testScheduledExecutorSchedulingProvider() {
     // Test that the monitor is scheduled by the 
ScheduledExecutorSchedulingProvider
     ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(1);
-    ScheduledExecutorSchedulingProvider provider = new 
ScheduledExecutorSchedulingProvider(executorService);
 
     // notifyingMonitor.monitor() should be called repeatedly.
     final CountDownLatch wasCalledLatch = new CountDownLatch(3);
@@ -167,7 +183,7 @@ public class TestMonitorService {
     };
 
     // monitor should get called every 1ms, so if await() misses the first 
call, there will be more.
-    provider.schedule(runnableMonitor, 1);
+    executorService.scheduleAtFixedRate(runnableMonitor, 0, 1, 
TimeUnit.MILLISECONDS);
 
     try {
       assertTrue(wasCalledLatch.await(5L, TimeUnit.SECONDS));
diff --git 
a/samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java
 
b/samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java
deleted file mode 100644
index 5c06911..0000000
--- 
a/samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.monitor.mock;
-
-import org.apache.samza.monitor.SchedulingProvider;
-
-/**
- * Instead of scheduling a monitor to run, just runs it ASAP.
- */
-public class InstantSchedulingProvider implements SchedulingProvider {
-
-  public void schedule(Runnable runnableMonitor, int interval) {
-    runnableMonitor.run();
-  }
-
-  // Nothing to stop because no deferred task was started
-  public void stop() {}
-}

Reply via email to