Repository: samza Updated Branches: refs/heads/master eed44d52e -> 2aa9f893d
SAMZA-1039 Selective loading of monitors in samza rest Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2aa9f893 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2aa9f893 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2aa9f893 Branch: refs/heads/master Commit: 2aa9f893dcf6c8403aa0ff68faefebd36f900c1c Parents: eed44d5 Author: Shanthoosh Venkataraman <[email protected]> Authored: Wed Oct 19 11:42:36 2016 -0700 Committer: Jacob Maes <[email protected]> Committed: Wed Oct 19 11:45:03 2016 -0700 ---------------------------------------------------------------------- build.gradle | 1 + .../samza/monitor/SamzaMonitorService.java | 18 +++++++--- .../samza/monitor/TestMonitorService.java | 38 ++++++++++++++++++++ .../samza/monitor/mock/MockMonitorFactory.java | 38 ++++++++++++++++++++ 4 files changed, 90 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/2aa9f893/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 98839f2..ac0213e 100644 --- a/build.gradle +++ b/build.gradle @@ -556,6 +556,7 @@ project(":samza-rest") { testCompile "junit:junit:$junitVersion" testCompile "org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-grizzly2:$jerseyVersion" + testCompile "org.mockito:mockito-all:$mockitoVersion" } tasks.create(name: "releaseRestServiceTar", type: Tar) { http://git-wip-us.apache.org/repos/asf/samza/blob/2aa9f893/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java index ce947f7..754ad82 100644 --- a/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java +++ b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java @@ -18,6 +18,7 @@ */ package org.apache.samza.monitor; +import com.google.common.base.Strings; import java.util.Map; import org.apache.samza.SamzaException; import org.apache.samza.metrics.MetricsRegistry; @@ -56,12 +57,19 @@ public class SamzaMonitorService { try { Map<String, MonitorConfig> monitorConfigs = getMonitorConfigs(config); for (Map.Entry<String, MonitorConfig> entry : monitorConfigs.entrySet()) { + String monitorName = entry.getKey(); MonitorConfig monitorConfig = entry.getValue(); - int schedulingIntervalInMs = monitorConfig.getSchedulingIntervalInMs(); - LOGGER.info("Scheduling monitor {} to run every {} ms", entry.getKey(), schedulingIntervalInMs); - // MetricsRegistry has been added in the Monitor interface, since it's required in the eventual future to record metrics. - // We have plans to record metrics, hence adding this as a placeholder. We just aren't doing it yet. - scheduler.schedule(getRunnable(instantiateMonitor(monitorConfig, metricsRegistry)), schedulingIntervalInMs); + + if (!Strings.isNullOrEmpty(monitorConfig.getMonitorFactoryClass())) { + int schedulingIntervalInMs = monitorConfig.getSchedulingIntervalInMs(); + LOGGER.info("Scheduling monitor {} to run every {} ms", monitorName, schedulingIntervalInMs); + // MetricsRegistry has been added in the Monitor interface, since it's required in the eventual future to record metrics. + // We have plans to record metrics, hence adding this as a placeholder. We just aren't doing it yet. + scheduler.schedule(getRunnable(instantiateMonitor(monitorConfig, metricsRegistry)), schedulingIntervalInMs); + } else { + // When MonitorFactoryClass is not defined in the config, ignore the monitor config + LOGGER.warn("Not scheduling the monitor: {} to run, since monitor factory class is not set in config.", monitorName); + } } } catch (InstantiationException e) { LOGGER.error("Exception when instantiating the monitor : ", e); http://git-wip-us.apache.org/repos/asf/samza/blob/2aa9f893/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java b/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java index 4618b54..e75f494 100644 --- a/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java +++ b/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java @@ -21,11 +21,13 @@ package org.apache.samza.monitor; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.Map; +import org.apache.samza.SamzaException; import org.apache.samza.config.MapConfig; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.monitor.mock.DummyMonitorFactory; import org.apache.samza.monitor.mock.ExceptionThrowingMonitorFactory; import org.apache.samza.monitor.mock.InstantSchedulingProvider; +import org.apache.samza.monitor.mock.MockMonitorFactory; import org.apache.samza.rest.SamzaRestConfig; import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.Test; @@ -34,6 +36,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.mockito.Mockito; import static junit.framework.TestCase.assertTrue; import static org.apache.samza.monitor.MonitorConfig.CONFIG_MONITOR_FACTORY_CLASS; @@ -102,6 +105,41 @@ public class TestMonitorService { } @Test + public void testShouldNotFailWhenTheMonitorFactoryClassIsNotDefined() + throws Exception { + // Test that when MonitorFactoryClass is not defined in the config, monitor service + // should not fail. + Map<String, String> configMap = ImmutableMap.of("monitor.monitor1.config.key1", "configValue1", + "monitor.monitor1.config.key2", "configValue2", + String.format("monitor.MOCK_MONITOR.%s", CONFIG_MONITOR_FACTORY_CLASS), + MockMonitorFactory.class.getCanonicalName()); + + SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap)); + SamzaMonitorService monitorService = new SamzaMonitorService(config, + METRICS_REGISTRY, + new InstantSchedulingProvider()); + try { + monitorService.start(); + } catch (Exception e) { + fail(); + } + Mockito.verify(MockMonitorFactory.MOCK_MONITOR, Mockito.times(1)).monitor(); + } + + @Test(expected = SamzaException.class) + public void testShouldFailWhenTheMonitorFactoryClassIsInvalid() { + // Test that when MonitorFactoryClass is defined in the config and is invalid, + // monitor service should fail. Should throw back SamzaException. + Map<String, String> configMap = ImmutableMap.of(String.format("monitor.name.%s", CONFIG_MONITOR_FACTORY_CLASS), + "RandomClassName"); + SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap)); + SamzaMonitorService monitorService = new SamzaMonitorService(config, + METRICS_REGISTRY, + new InstantSchedulingProvider()); + monitorService.start(); + } + + @Test public void testScheduledExecutorSchedulingProvider() { // Test that the monitor is scheduled by the ScheduledExecutorSchedulingProvider ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); http://git-wip-us.apache.org/repos/asf/samza/blob/2aa9f893/samza-rest/src/test/java/org/apache/samza/monitor/mock/MockMonitorFactory.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/mock/MockMonitorFactory.java b/samza-rest/src/test/java/org/apache/samza/monitor/mock/MockMonitorFactory.java new file mode 100644 index 0000000..5b19001 --- /dev/null +++ b/samza-rest/src/test/java/org/apache/samza/monitor/mock/MockMonitorFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.monitor.mock; + +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.monitor.Monitor; +import org.apache.samza.monitor.MonitorConfig; +import org.apache.samza.monitor.MonitorFactory; +import org.mockito.Mockito; + + +public class MockMonitorFactory implements MonitorFactory { + + public static final Monitor MOCK_MONITOR = Mockito.mock(Monitor.class); + + @Override + public Monitor getMonitorInstance(MonitorConfig config, MetricsRegistry metricsRegistry) + throws Exception { + Mockito.reset(MOCK_MONITOR); + return MOCK_MONITOR; + } +}
