Repository: samza
Updated Branches:
  refs/heads/master eed44d52e -> 2aa9f893d


SAMZA-1039 Selective loading of monitors in samza rest


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2aa9f893
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2aa9f893
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2aa9f893

Branch: refs/heads/master
Commit: 2aa9f893dcf6c8403aa0ff68faefebd36f900c1c
Parents: eed44d5
Author: Shanthoosh Venkataraman <[email protected]>
Authored: Wed Oct 19 11:42:36 2016 -0700
Committer: Jacob Maes <[email protected]>
Committed: Wed Oct 19 11:45:03 2016 -0700

----------------------------------------------------------------------
 build.gradle                                    |  1 +
 .../samza/monitor/SamzaMonitorService.java      | 18 +++++++---
 .../samza/monitor/TestMonitorService.java       | 38 ++++++++++++++++++++
 .../samza/monitor/mock/MockMonitorFactory.java  | 38 ++++++++++++++++++++
 4 files changed, 90 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2aa9f893/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 98839f2..ac0213e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -556,6 +556,7 @@ project(":samza-rest") {
 
     testCompile "junit:junit:$junitVersion"
     testCompile 
"org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-grizzly2:$jerseyVersion"
+    testCompile "org.mockito:mockito-all:$mockitoVersion"
   }
 
   tasks.create(name: "releaseRestServiceTar", type: Tar) {

http://git-wip-us.apache.org/repos/asf/samza/blob/2aa9f893/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java 
b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
index ce947f7..754ad82 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.monitor;
 
+import com.google.common.base.Strings;
 import java.util.Map;
 import org.apache.samza.SamzaException;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -56,12 +57,19 @@ public class SamzaMonitorService {
         try {
             Map<String, MonitorConfig> monitorConfigs = 
getMonitorConfigs(config);
             for (Map.Entry<String, MonitorConfig> entry : 
monitorConfigs.entrySet()) {
+                String monitorName = entry.getKey();
                 MonitorConfig monitorConfig = entry.getValue();
-                int schedulingIntervalInMs = 
monitorConfig.getSchedulingIntervalInMs();
-                LOGGER.info("Scheduling monitor {} to run every {} ms", 
entry.getKey(), schedulingIntervalInMs);
-                // MetricsRegistry has been added in the Monitor interface, 
since it's required in the eventual future to record metrics.
-                // We have plans to record metrics, hence adding this as a 
placeholder. We just aren't doing it yet.
-                
scheduler.schedule(getRunnable(instantiateMonitor(monitorConfig, 
metricsRegistry)), schedulingIntervalInMs);
+
+                if 
(!Strings.isNullOrEmpty(monitorConfig.getMonitorFactoryClass())) {
+                    int schedulingIntervalInMs = 
monitorConfig.getSchedulingIntervalInMs();
+                    LOGGER.info("Scheduling monitor {} to run every {} ms", 
monitorName, schedulingIntervalInMs);
+                    // MetricsRegistry has been added in the Monitor 
interface, since it's required in the eventual future to record metrics.
+                    // We have plans to record metrics, hence adding this as a 
placeholder. We just aren't doing it yet.
+                    
scheduler.schedule(getRunnable(instantiateMonitor(monitorConfig, 
metricsRegistry)), schedulingIntervalInMs);
+                } else {
+                  // When MonitorFactoryClass is not defined in the config, 
ignore the monitor config
+                  LOGGER.warn("Not scheduling the monitor: {} to run, since 
monitor factory class is not set in config.", monitorName);
+                }
             }
         } catch (InstantiationException e) {
             LOGGER.error("Exception when instantiating the monitor : ", e);

http://git-wip-us.apache.org/repos/asf/samza/blob/2aa9f893/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java 
b/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
index 4618b54..e75f494 100644
--- a/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
+++ b/samza-rest/src/test/java/org/apache/samza/monitor/TestMonitorService.java
@@ -21,11 +21,13 @@ package org.apache.samza.monitor;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.monitor.mock.DummyMonitorFactory;
 import org.apache.samza.monitor.mock.ExceptionThrowingMonitorFactory;
 import org.apache.samza.monitor.mock.InstantSchedulingProvider;
+import org.apache.samza.monitor.mock.MockMonitorFactory;
 import org.apache.samza.rest.SamzaRestConfig;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.Test;
@@ -34,6 +36,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.mockito.Mockito;
 
 import static junit.framework.TestCase.assertTrue;
 import static 
org.apache.samza.monitor.MonitorConfig.CONFIG_MONITOR_FACTORY_CLASS;
@@ -102,6 +105,41 @@ public class TestMonitorService {
     }
 
     @Test
+    public void testShouldNotFailWhenTheMonitorFactoryClassIsNotDefined()
+        throws Exception {
+        // Test that when MonitorFactoryClass is not defined in the config, 
monitor service
+        // should not fail.
+        Map<String, String> configMap = 
ImmutableMap.of("monitor.monitor1.config.key1", "configValue1",
+                                                        
"monitor.monitor1.config.key2", "configValue2",
+                                                        
String.format("monitor.MOCK_MONITOR.%s", CONFIG_MONITOR_FACTORY_CLASS),
+                                                        
MockMonitorFactory.class.getCanonicalName());
+
+        SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap));
+        SamzaMonitorService monitorService = new SamzaMonitorService(config,
+                                                                     
METRICS_REGISTRY,
+                                                                     new 
InstantSchedulingProvider());
+        try {
+            monitorService.start();
+        } catch (Exception e) {
+            fail();
+        }
+        Mockito.verify(MockMonitorFactory.MOCK_MONITOR, 
Mockito.times(1)).monitor();
+    }
+
+    @Test(expected = SamzaException.class)
+    public void testShouldFailWhenTheMonitorFactoryClassIsInvalid() {
+        // Test that when MonitorFactoryClass is defined in the config and is 
invalid,
+        // monitor service should fail. Should throw back SamzaException.
+        Map<String, String> configMap = 
ImmutableMap.of(String.format("monitor.name.%s", CONFIG_MONITOR_FACTORY_CLASS),
+                                                        "RandomClassName");
+        SamzaRestConfig config = new SamzaRestConfig(new MapConfig(configMap));
+        SamzaMonitorService monitorService = new SamzaMonitorService(config,
+                                                                     
METRICS_REGISTRY,
+                                                                     new 
InstantSchedulingProvider());
+        monitorService.start();
+    }
+
+    @Test
     public void testScheduledExecutorSchedulingProvider() {
         // Test that the monitor is scheduled by the 
ScheduledExecutorSchedulingProvider
         ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(1);

http://git-wip-us.apache.org/repos/asf/samza/blob/2aa9f893/samza-rest/src/test/java/org/apache/samza/monitor/mock/MockMonitorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/test/java/org/apache/samza/monitor/mock/MockMonitorFactory.java
 
b/samza-rest/src/test/java/org/apache/samza/monitor/mock/MockMonitorFactory.java
new file mode 100644
index 0000000..5b19001
--- /dev/null
+++ 
b/samza-rest/src/test/java/org/apache/samza/monitor/mock/MockMonitorFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor.mock;
+
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.monitor.Monitor;
+import org.apache.samza.monitor.MonitorConfig;
+import org.apache.samza.monitor.MonitorFactory;
+import org.mockito.Mockito;
+
+
+public class MockMonitorFactory implements MonitorFactory {
+
+  public static final Monitor MOCK_MONITOR = Mockito.mock(Monitor.class);
+
+  @Override
+  public Monitor getMonitorInstance(MonitorConfig config, MetricsRegistry 
metricsRegistry)
+      throws Exception {
+    Mockito.reset(MOCK_MONITOR);
+    return MOCK_MONITOR;
+  }
+}

Reply via email to