This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 13b5303 SAMZA-2595: Updated MonitorService to use separate
ScheduleExecutor for each monitor (#1434)
13b5303 is described below
commit 13b5303ae1ee76a58d2ebe1dfd2c1df476e7e8b5
Author: shekhars-li <[email protected]>
AuthorDate: Mon Oct 12 18:55:23 2020 -0700
SAMZA-2595: Updated MonitorService to use separate ScheduleExecutor for
each monitor (#1434)
Co-authored-by: Shekhar Sharma <[email protected]>
---
.../org/apache/samza/monitor/MonitorConfig.java | 6 ++-
.../apache/samza/monitor/SamzaMonitorService.java | 43 ++++++++++++++++++----
.../ScheduledExecutorSchedulingProvider.java | 40 --------------------
.../apache/samza/monitor/SchedulingProvider.java | 30 ---------------
.../org/apache/samza/rest/SamzaRestService.java | 22 ++---------
.../apache/samza/monitor/TestMonitorService.java | 36 +++++++++++++-----
.../monitor/mock/InstantSchedulingProvider.java | 34 -----------------
7 files changed, 70 insertions(+), 141 deletions(-)
diff --git
a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorConfig.java
b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorConfig.java
index ced6fc1..2291258 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorConfig.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorConfig.java
@@ -44,7 +44,11 @@ public class MonitorConfig extends MapConfig {
private static final String CONFIG_SCHEDULING_JITTER_PERCENT =
"scheduling.jitter.percent";
- private static final int DEFAULT_SCHEDULING_JITTER_PERCENT = 0;
+ // Default scheduling jitter set to 100 to ensure every monitor is scheduled
with an initial jitter.
+ // The change SAMZA-2595 creates a new scheduler/thread for every monitor.
Jitter ensures that there is a delay in
+ // scheduling of every monitor to ensure all the monitors do not start at
the same time, to avoid spike on host
+ // and to not overwhelm the metric reporting service.
+ private static final int DEFAULT_SCHEDULING_JITTER_PERCENT = 100;
public MonitorConfig(Config config) {
super(config);
diff --git
a/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
index 5558917..6d84143 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
@@ -18,9 +18,18 @@
*/
package org.apache.samza.monitor;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import org.apache.samza.SamzaException;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.rest.SamzaRestConfig;
@@ -43,16 +52,14 @@ public class SamzaMonitorService {
private static final Logger LOGGER =
LoggerFactory.getLogger(SamzaMonitorService.class);
private static final SecureRandom RANDOM = new SecureRandom();
- private final SchedulingProvider scheduler;
private final SamzaRestConfig config;
private final MetricsRegistry metricsRegistry;
-
+ private final List<ScheduledExecutorService> scheduledExecutors;
public SamzaMonitorService(SamzaRestConfig config,
- MetricsRegistry metricsRegistry,
- SchedulingProvider schedulingProvider) {
+ MetricsRegistry metricsRegistry) {
this.config = config;
this.metricsRegistry = metricsRegistry;
- this.scheduler = schedulingProvider;
+ this.scheduledExecutors = new ArrayList<>();
}
public void start() {
@@ -67,8 +74,10 @@ public class SamzaMonitorService {
int monitorSchedulingJitterInMs = (int)
(RANDOM.nextInt(schedulingIntervalInMs + 1) *
(monitorConfig.getSchedulingJitterPercent() / 100.0));
schedulingIntervalInMs += monitorSchedulingJitterInMs;
LOGGER.info("Scheduling the monitor: {} to run every {} ms.",
monitorName, schedulingIntervalInMs);
- scheduler.schedule(getRunnable(instantiateMonitor(monitorName,
monitorConfig, metricsRegistry)),
- schedulingIntervalInMs);
+ // Create a new SchedulerExecutorService for each monitor. This
ensures that a long running monitor service
+ // does not block another monitor from scheduling/running. A long
running monitor will not create a backlog
+ // of work for future execution of the same monitor. A new monitor
is scheduled only when current work is complete.
+ createSchedulerAndScheduleMonitor(monitorName, monitorConfig,
schedulingIntervalInMs);
} else {
// When MonitorFactoryClass is not defined in the config, ignore the
monitor config
LOGGER.warn("Not scheduling the monitor: {} to run, since monitor
factory class is not set in config.", monitorName);
@@ -81,7 +90,7 @@ public class SamzaMonitorService {
}
public void stop() {
- this.scheduler.stop();
+ scheduledExecutors.forEach(ExecutorService::shutdown);
}
private Runnable getRunnable(final Monitor monitor) {
@@ -100,4 +109,22 @@ public class SamzaMonitorService {
}
};
}
+
+ /**
+ * Creates a ScheduledThreadPoolExecutor with core pool size 1 and schedules
the monitor to run every schedulingIntervalInMs
+ */
+ @VisibleForTesting
+ public void createSchedulerAndScheduleMonitor(String monitorName,
MonitorConfig monitorConfig, long schedulingIntervalInMs)
+ throws InstantiationException {
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("MonitorThread-%d")
+ .build();
+
+ ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(1, threadFactory);
+ scheduledExecutors.add(scheduledExecutorService);
+
+ scheduledExecutorService
+ .scheduleAtFixedRate(getRunnable(instantiateMonitor(monitorName,
monitorConfig, metricsRegistry)),
+ 0, schedulingIntervalInMs, TimeUnit.MILLISECONDS);
+ }
}
diff --git
a/samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java
b/samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java
deleted file mode 100644
index e1f0711..0000000
---
a/samza-rest/src/main/java/org/apache/samza/monitor/ScheduledExecutorSchedulingProvider.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.monitor;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-public class ScheduledExecutorSchedulingProvider implements SchedulingProvider
{
-
- private final ScheduledExecutorService scheduler;
-
- public ScheduledExecutorSchedulingProvider(ScheduledExecutorService
scheduler) {
- this.scheduler = scheduler;
- }
-
- public void schedule(Runnable runnable, int interval) {
- this.scheduler.scheduleAtFixedRate(runnable, 0, interval, MILLISECONDS);
- }
-
- public void stop() {
- this.scheduler.shutdownNow();
- }
-}
diff --git
a/samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java
b/samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java
deleted file mode 100644
index d68f6c1..0000000
--- a/samza-rest/src/main/java/org/apache/samza/monitor/SchedulingProvider.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.monitor;
-
-/**
- * Provides scheduling functionality to the SamzaMonitorService.
- */
-public interface SchedulingProvider {
- /* Schedule a the given Runnable to run() every INTERVAL ms. */
- void schedule(Runnable runnable, int intervalMs);
-
- /* Stop any future executions of any scheduled tasks. */
- void stop();
-}
diff --git
a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
index c5d6af7..d48d1ab 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
@@ -18,9 +18,7 @@
*/
package org.apache.samza.rest;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
-import java.util.concurrent.ThreadFactory;
import joptsimple.OptionSet;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
@@ -29,7 +27,6 @@ import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.metrics.ReadableMetricsRegistry;
import org.apache.samza.monitor.SamzaMonitorService;
-import org.apache.samza.monitor.ScheduledExecutorSchedulingProvider;
import org.apache.samza.util.CommandLine;
import org.apache.samza.util.MetricsReporterLoader;
import org.apache.samza.util.Util;
@@ -42,8 +39,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.Servlet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
/**
@@ -88,7 +83,7 @@ public class SamzaRestService {
*/
public static void main(String[] args)
throws Exception {
- ScheduledExecutorSchedulingProvider schedulingProvider = null;
+ SamzaMonitorService monitorService = null;
try {
SamzaRestConfig config = parseConfig(args);
ReadableMetricsRegistry metricsRegistry = new MetricsRegistryMap();
@@ -104,24 +99,15 @@ public class SamzaRestService {
ServletContainer container = new ServletContainer(samzaRestApplication);
restService.addServlet(container, "/*");
- // Schedule monitors to run
- ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
-
.setNameFormat("MonitorThread-%d")
- .build();
- ScheduledExecutorService schedulingService =
Executors.newScheduledThreadPool(1, threadFactory);
- schedulingProvider = new
ScheduledExecutorSchedulingProvider(schedulingService);
- SamzaMonitorService monitorService = new SamzaMonitorService(config,
- metricsRegistry,
- schedulingProvider);
+ monitorService = new SamzaMonitorService(config, metricsRegistry);
monitorService.start();
restService.runBlocking();
- monitorService.stop();
} catch (Throwable t) {
log.error("Exception in main.", t);
} finally {
- if (schedulingProvider != null) {
- schedulingProvider.stop();
+ if (monitorService != null) {
+ monitorService.stop();
}
}
}
diff --git
a/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
b/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
index b2924fc..a4ed3d2 100644
--- a/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
+++ b/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
@@ -26,7 +26,6 @@ import org.apache.samza.config.MapConfig;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.monitor.mock.DummyMonitorFactory;
import org.apache.samza.monitor.mock.ExceptionThrowingMonitorFactory;
-import org.apache.samza.monitor.mock.InstantSchedulingProvider;
import org.apache.samza.monitor.mock.MockMonitorFactory;
import org.apache.samza.rest.SamzaRestConfig;
import org.apache.samza.util.NoOpMetricsRegistry;
@@ -40,6 +39,7 @@ import org.mockito.Mockito;
import static junit.framework.TestCase.assertTrue;
import static
org.apache.samza.monitor.MonitorConfig.CONFIG_MONITOR_FACTORY_CLASS;
+import static org.apache.samza.monitor.MonitorLoader.instantiateMonitor;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -96,8 +96,7 @@ public class TestMonitorService {
ExceptionThrowingMonitorFactory.class.getCanonicalName());
SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap));
SamzaMonitorService monitorService = new SamzaMonitorService(config,
-
METRICS_REGISTRY,
- new
InstantSchedulingProvider());
+
METRICS_REGISTRY);
// This will throw if the exception isn't caught within the provider.
monitorService.start();
@@ -115,9 +114,28 @@ public class TestMonitorService {
MockMonitorFactory.class.getCanonicalName());
SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap));
- SamzaMonitorService monitorService = new SamzaMonitorService(config,
-
METRICS_REGISTRY,
- new
InstantSchedulingProvider());
+
+ class SamzaMonitorServiceTest extends SamzaMonitorService {
+ MetricsRegistry metricsRegistry;
+ public SamzaMonitorServiceTest(SamzaRestConfig config, MetricsRegistry
metricsRegistry) {
+ super(config, metricsRegistry);
+ this.metricsRegistry = metricsRegistry;
+ }
+
+ @Override
+ public void createSchedulerAndScheduleMonitor(String monitorName,
MonitorConfig monitorConfig, long schedulingIntervalInMs) {
+ try {
+ // immediately run monitor, without scheduling
+ instantiateMonitor(monitorName, monitorConfig,
metricsRegistry).monitor();
+ } catch (Exception e) {
+ fail();
+ }
+ }
+ }
+
+ SamzaMonitorService monitorService = new SamzaMonitorServiceTest(config,
+ METRICS_REGISTRY);
+
try {
monitorService.start();
} catch (Exception e) {
@@ -134,8 +152,7 @@ public class TestMonitorService {
"RandomClassName");
SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap));
SamzaMonitorService monitorService = new SamzaMonitorService(config,
-
METRICS_REGISTRY,
- new
InstantSchedulingProvider());
+
METRICS_REGISTRY);
monitorService.start();
}
@@ -143,7 +160,6 @@ public class TestMonitorService {
public void testScheduledExecutorSchedulingProvider() {
// Test that the monitor is scheduled by the
ScheduledExecutorSchedulingProvider
ScheduledExecutorService executorService =
Executors.newScheduledThreadPool(1);
- ScheduledExecutorSchedulingProvider provider = new
ScheduledExecutorSchedulingProvider(executorService);
// notifyingMonitor.monitor() should be called repeatedly.
final CountDownLatch wasCalledLatch = new CountDownLatch(3);
@@ -167,7 +183,7 @@ public class TestMonitorService {
};
// monitor should get called every 1ms, so if await() misses the first
call, there will be more.
- provider.schedule(runnableMonitor, 1);
+ executorService.scheduleAtFixedRate(runnableMonitor, 0, 1,
TimeUnit.MILLISECONDS);
try {
assertTrue(wasCalledLatch.await(5L, TimeUnit.SECONDS));
diff --git
a/samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java
b/samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java
deleted file mode 100644
index 5c06911..0000000
---
a/samza-rest/src/test/java/org/apache/samza/monitor/mock/InstantSchedulingProvider.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.monitor.mock;
-
-import org.apache.samza.monitor.SchedulingProvider;
-
-/**
- * Instead of scheduling a monitor to run, just runs it ASAP.
- */
-public class InstantSchedulingProvider implements SchedulingProvider {
-
- public void schedule(Runnable runnableMonitor, int interval) {
- runnableMonitor.run();
- }
-
- // Nothing to stop because no deferred task was started
- public void stop() {}
-}