Repository: storm
Updated Branches:
  refs/heads/master 5d601e8d5 -> 8e04115e9


Adding whitelist for scheduler strategies


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f9d75d8a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f9d75d8a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f9d75d8a

Branch: refs/heads/master
Commit: f9d75d8a38d77ec725b7f3b58d6666196bac62e0
Parents: 03578ca
Author: Kyle Nusbaum <knusb...@yahoo-inc.com>
Authored: Mon Jul 10 16:30:22 2017 -0500
Committer: Kyle Nusbaum <knusb...@yahoo-inc.com>
Committed: Wed Aug 9 16:53:35 2017 -0500

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/Config.java        |  7 ++++
 .../utils/DisallowedStrategyException.java      | 40 ++++++++++++++++++++
 .../org/apache/storm/utils/ReflectionUtils.java | 12 ++++++
 .../jvm/org/apache/storm/utils/UtilsTest.java   | 35 +++++++++++++++++
 .../resource/ResourceAwareScheduler.java        | 23 ++++++++---
 5 files changed, 111 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f9d75d8a/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java 
b/storm-client/src/jvm/org/apache/storm/Config.java
index 9d659d5..cdf4fee 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1314,6 +1314,13 @@ public class Config extends HashMap<String, Object> {
     public static final String NIMBUS_IMPERSONATION_ACL = 
"nimbus.impersonation.acl";
 
     /**
+     * A whitelist of the RAS scheduler strategies allowed by nimbus. Should 
be a list of fully-qualified class names
+     * or null to allow all.
+     */
+    @isStringList
+    public static final String NIMBUS_SCHEDULER_STRATEGY_CLASS_WHITELIST = 
"nimbus.scheduler.strategy.class.whitelist";
+
+    /**
      * Full path to the worker-laucher executable that will be used to lauch 
workers when
      * SUPERVISOR_RUN_WORKER_AS_USER is set to true.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/f9d75d8a/storm-client/src/jvm/org/apache/storm/utils/DisallowedStrategyException.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/utils/DisallowedStrategyException.java 
b/storm-client/src/jvm/org/apache/storm/utils/DisallowedStrategyException.java
new file mode 100644
index 0000000..41145a9
--- /dev/null
+++ 
b/storm-client/src/jvm/org/apache/storm/utils/DisallowedStrategyException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import java.util.List;
+
+public class DisallowedStrategyException extends RuntimeException {
+    private String attemptedClass;
+    private List<String>allowedStrategies;
+
+    
+    public DisallowedStrategyException(String attemptedClass, List<String> 
allowedStrategies) {
+        this.attemptedClass = attemptedClass;
+        this.allowedStrategies = allowedStrategies;
+    }
+
+    public String getAttemptedClass() {
+        return attemptedClass;
+    }
+
+    public List<String> getAllowedStrategies() {
+        return allowedStrategies;
+    }    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f9d75d8a/storm-client/src/jvm/org/apache/storm/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/ReflectionUtils.java 
b/storm-client/src/jvm/org/apache/storm/utils/ReflectionUtils.java
index 174bb8c..1606bb6 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ReflectionUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ReflectionUtils.java
@@ -19,6 +19,8 @@
 package org.apache.storm.utils;
 
 import java.util.Map;
+import java.util.List;
+import org.apache.storm.Config;
 
 public class ReflectionUtils {
     // A singleton instance allows us to mock delegated static methods in our
@@ -72,6 +74,16 @@ public class ReflectionUtils {
         }
     }
 
+    public static <T> T newSchedulerStrategyInstance(String klass, Map<String, 
Object> conf) {
+        List<String> allowedSchedulerStrategies = (List<String>) 
conf.get(Config.NIMBUS_SCHEDULER_STRATEGY_CLASS_WHITELIST);
+        if(allowedSchedulerStrategies == null || 
allowedSchedulerStrategies.contains(klass)) {
+            return newInstance(klass);
+        }
+        else {
+            throw new DisallowedStrategyException(klass, 
allowedSchedulerStrategies);
+        }
+    }
+
     // Non-static impl methods exist for mocking purposes.
     public <T> T newInstanceImpl(Class<T> klass) {
         try {

http://git-wip-us.apache.org/repos/asf/storm/blob/f9d75d8a/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java 
b/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
index 496b7cf..fc042c2 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/UtilsTest.java
@@ -22,11 +22,14 @@ import org.apache.storm.Config;
 import org.apache.thrift.transport.TTransportException;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.ArrayList;
 
 public class UtilsTest {
 
@@ -173,4 +176,36 @@ public class UtilsTest {
         }
     }
 
+    @Rule
+    public final ExpectedException schedulerException = 
ExpectedException.none();
+
+    @Test
+    public void testSchedulerStrategyWhitelist() {
+        Map<String, Object> config = ConfigUtils.readStormConfig();
+        String allowed = 
"org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy";
+        config.put(Config.NIMBUS_SCHEDULER_STRATEGY_CLASS_WHITELIST, 
Arrays.asList(allowed));
+
+        Object sched = ReflectionUtils.newSchedulerStrategyInstance(allowed, 
config);
+        Assert.assertEquals(sched.getClass().getName(), allowed);
+    }
+
+    @Test
+    public void testSchedulerStrategyWhitelistException() {
+        Map<String, Object> config = ConfigUtils.readStormConfig();
+        String allowed = 
"org.apache.storm.scheduler.resource.strategies.scheduling.SomeNonExistantStrategy";
+        String notAllowed = 
"org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy";
+        config.put(Config.NIMBUS_SCHEDULER_STRATEGY_CLASS_WHITELIST, 
Arrays.asList(allowed));
+
+        schedulerException.expect(DisallowedStrategyException.class);
+        ReflectionUtils.newSchedulerStrategyInstance(notAllowed, config);
+    }
+
+    @Test
+    public void testSchedulerStrategyEmptyWhitelist() {
+        Map<String, Object> config = ConfigUtils.readStormConfig();
+        String allowed = 
"org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy";
+
+        Object sched = ReflectionUtils.newSchedulerStrategyInstance(allowed, 
config);
+        Assert.assertEquals(sched.getClass().getName(), allowed);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f9d75d8a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index 0b9e800..6ceb0cd 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -25,6 +25,7 @@ import 
org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriori
 import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ReflectionUtils;
+import org.apache.storm.utils.DisallowedStrategyException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -119,6 +120,13 @@ public class ResourceAwareScheduler implements IScheduler {
         updateSupervisorsResources(cluster, topologies);
     }
 
+    private void handleSchedulingError(TopologyDetails td, SchedulingState 
schedulingState, Exception e) {
+        LOG.error("failed to create instance of IStrategy: {} with error: {}! 
Topology {} will not be scheduled.",
+                  td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), 
e.getMessage(), td.getName());
+        User topologySubmitter = cleanup(schedulingState, td);
+        topologySubmitter.moveTopoFromPendingToInvalid(td);
+    }
+
     public void scheduleTopology(TopologyDetails td) {
         User topologySubmitter = 
this.schedulingState.userMap.get(td.getTopologySubmitter());
         if (this.schedulingState.cluster.getUnassignedExecutors(td).size() > 
0) {
@@ -127,14 +135,17 @@ public class ResourceAwareScheduler implements IScheduler 
{
             SchedulingState schedulingState = checkpointSchedulingState();
             IStrategy rasStrategy = null;
             try {
-                rasStrategy = (IStrategy) ReflectionUtils.newInstance((String) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY));
+                rasStrategy = (IStrategy) 
ReflectionUtils.newSchedulerStrategyInstance((String) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
+            } catch (DisallowedStrategyException e) {
+                handleSchedulingError(td, schedulingState, e);
+                this.schedulingState.cluster.setStatus(td.getId(), 
"Unsuccessful in scheduling - " + e.getAttemptedClass()
+                                                       + " is not an allowed 
strategy. Please make sure your " + Config.TOPOLOGY_SCHEDULER_STRATEGY
+                                                       + " config is one of 
the allowed strategies: " + e.getAllowedStrategies().toString());
+                return;
             } catch (RuntimeException e) {
-                LOG.error("failed to create instance of IStrategy: {} with 
error: {}! Topology {} will not be scheduled.",
-                        td.getName(), 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
-                topologySubmitter = cleanup(schedulingState, td);
-                topologySubmitter.moveTopoFromPendingToInvalid(td);
+                handleSchedulingError(td, schedulingState, e);
                 this.schedulingState.cluster.setStatus(td.getId(), 
"Unsuccessful in scheduling - failed to create instance of topology strategy "
-                        + td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) 
+ ". Please check logs for details");
+                                                       + 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) + ". Please check logs for 
details");
                 return;
             }
             IEvictionStrategy evictionStrategy = null;

Reply via email to