http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyActions.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyActions.java b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyActions.java new file mode 100644 index 0000000..6a4a67a --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyActions.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.nimbus; + +/** + * Actions that can be done to a topology in nimbus + */ +public enum TopologyActions { + STARTUP, + INACTIVATE, + ACTIVATE, + REBALANCE, + KILL, + DO_REBALANCE, + REMOVE +}
http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyResources.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyResources.java b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyResources.java new file mode 100644 index 0000000..ab9b212 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyResources.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.nimbus; + +public final class TopologyResources { + private final Double requestedMemOnHeap; + private final Double requestedMemOffHeap; + private final Double requestedCpu; + private final Double assignedMemOnHeap; + private final Double assignedMemOffHeap; + private final Double assignedCpu; + + public TopologyResources(Double requestedMemOnHeap, Double requestedMemOffHeap, + Double requestedCpu, Double assignedMemOnHeap, Double assignedMemOffHeap, + Double assignedCpu) { + this.requestedMemOnHeap = requestedMemOnHeap; + this.requestedMemOffHeap = requestedMemOffHeap; + this.requestedCpu = requestedCpu; + this.assignedMemOnHeap = assignedMemOnHeap; + this.assignedMemOffHeap = assignedMemOffHeap; + this.assignedCpu = assignedCpu; + + } + + public Double getRequestedMemOnHeap() { + return requestedMemOnHeap; + } + + public Double getRequestedMemOffHeap() { + return requestedMemOffHeap; + } + + public Double getRequestedCpu() { + return requestedCpu; + } + + public Double getAssignedMemOnHeap() { + return assignedMemOnHeap; + } + + public Double getAssignedMemOffHeap() { + return assignedMemOffHeap; + } + + public Double getAssignedCpu() { + return assignedCpu; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyStateTransition.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyStateTransition.java b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyStateTransition.java new file mode 100644 index 0000000..39151e4 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/nimbus/TopologyStateTransition.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.nimbus; + +import org.apache.storm.generated.StormBase; + +/** + * A transition from one state to another + */ +interface TopologyStateTransition { + StormBase transition(Object argument, Nimbus nimbus, String topoId, StormBase base) throws Exception; +} http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java index d02cfa1..b2af336 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java @@ -17,6 +17,8 @@ */ package org.apache.storm.daemon.supervisor; +import static org.apache.storm.utils.Utils.OR; + import java.io.File; import java.io.FilenameFilter; import java.io.IOException; @@ -610,16 +612,6 @@ public class BasicContainer extends Container { return workerProfilerChildopts; } - /** - * a or b the first one that is not null - * @param a something - * @param b something else - * @return a or b the first one that is not null - */ - private <V> V OR(V a, V b) { - return a == null ? b : a; - } - protected String javaCmd(String cmd) { String ret = null; String javaHome = System.getenv().get("JAVA_HOME"); @@ -663,7 +655,7 @@ public class BasicContainer extends Container { commandList.addAll(commonParams); commandList.addAll(substituteChildopts(_conf.get(Config.WORKER_CHILDOPTS), memOnheap)); commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), memOnheap)); - commandList.addAll(substituteChildopts(OR( + commandList.addAll(substituteChildopts(Utils.OR( _topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS), _conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap)); commandList.addAll(getWorkerProfilerChildOpts(memOnheap)); http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java index 9d31c87..12709ac 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java @@ -142,8 +142,12 @@ public class Worker implements Shutdownable, DaemonCommon { IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, acls, new ClusterStateContext()); Credentials initialCredentials = stormClusterState.credentials(topologyId, null); + Map<String, String> initCreds = new HashMap<>(); + if (initialCredentials != null) { + initCreds.putAll(initialCredentials.get_creds()); + } autoCreds = AuthUtils.GetAutoCredentials(topologyConf); - subject = AuthUtils.populateSubject(null, autoCreds, initialCredentials.get_creds()); + subject = AuthUtils.populateSubject(null, autoCreds, initCreds); Subject.doAs(subject, new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { @@ -187,11 +191,11 @@ public class Worker implements Shutdownable, DaemonCommon { for (List<Long> e : workerState.getExecutors()) { if (ConfigUtils.isLocalMode(topologyConf)) { newExecutors.add( - LocalExecutor.mkExecutor(workerState, e, initialCredentials.get_creds()) + LocalExecutor.mkExecutor(workerState, e, initCreds) .execute()); } else { newExecutors.add( - Executor.mkExecutor(workerState, e, initialCredentials.get_creds()) + Executor.mkExecutor(workerState, e, initCreds) .execute()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/nimbus/NimbusInfo.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/nimbus/NimbusInfo.java b/storm-core/src/jvm/org/apache/storm/nimbus/NimbusInfo.java index c933c31..1530239 100644 --- a/storm-core/src/jvm/org/apache/storm/nimbus/NimbusInfo.java +++ b/storm-core/src/jvm/org/apache/storm/nimbus/NimbusInfo.java @@ -18,6 +18,7 @@ package org.apache.storm.nimbus; import org.apache.storm.Config; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,6 +28,7 @@ import java.net.UnknownHostException; import java.util.Map; public class NimbusInfo implements Serializable { + private static final long serialVersionUID = 2161446155116099333L; private static final Logger LOG = LoggerFactory.getLogger(NimbusInfo.class); private static final String DELIM = ":"; @@ -49,17 +51,17 @@ public class NimbusInfo implements Serializable { } } - public static NimbusInfo fromConf(Map conf) { + public static NimbusInfo fromConf(Map<String, Object> conf) { try { String host = InetAddress.getLocalHost().getCanonicalHostName(); if (conf.containsKey(Config.STORM_LOCAL_HOSTNAME)) { - host = conf.get(Config.STORM_LOCAL_HOSTNAME).toString(); + host = (String) conf.get(Config.STORM_LOCAL_HOSTNAME); LOG.info("Overriding nimbus host to storm.local.hostname -> {}", host); } else { LOG.info("Nimbus figures out its name to {}", host); } - int port = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString()); + int port = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT), 6627); return new NimbusInfo(host, port, false); } catch (UnknownHostException e) { http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java b/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java index dffeb47..1b7d7f8 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java @@ -46,6 +46,22 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { } @Override + public String toString() { + return this.getClass().getSimpleName() + " topo: " + topologyId + " execToSlots: " + executorToSlot; + } + + @Override + public boolean equals(Object other) { + if (other == this) return true; + if (other instanceof SchedulerAssignmentImpl) { + SchedulerAssignmentImpl sother = (SchedulerAssignmentImpl) other; + return topologyId.equals(sother.topologyId) && + executorToSlot.equals(sother.executorToSlot); + } + return false; + } + + @Override public Set<WorkerSlot> getSlots() { return new HashSet<>(executorToSlot.values()); } http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java b/storm-core/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java index 1f4ebd7..0ed6f8d 100644 --- a/storm-core/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java +++ b/storm-core/src/jvm/org/apache/storm/scheduler/SupervisorDetails.java @@ -49,7 +49,7 @@ public class SupervisorDetails { private Map<String, Double> _total_resources; public SupervisorDetails(String id, String host, Object meta, Object schedulerMeta, - Collection<Number> allPorts, Map<String, Double> total_resources){ + Collection<? extends Number> allPorts, Map<String, Double> total_resources){ this.id = id; this.host = host; @@ -72,23 +72,29 @@ public class SupervisorDetails { this(id, null, meta, null, null, total_resources); } - public SupervisorDetails(String id, Object meta, Collection<Number> allPorts){ + public SupervisorDetails(String id, Object meta, Collection<? extends Number> allPorts){ this(id, null, meta, null, allPorts, null); } - public SupervisorDetails(String id, String host, Object schedulerMeta, Collection<Number> allPorts) { + public SupervisorDetails(String id, String host, Object schedulerMeta, Collection<? extends Number> allPorts) { this(id, host, null, schedulerMeta, allPorts, null); } public SupervisorDetails(String id, String host, Object schedulerMeta, - Collection<Number> allPorts, Map<String, Double> total_resources) { + Collection<? extends Number> allPorts, Map<String, Double> total_resources) { this(id, host, null, schedulerMeta, allPorts, total_resources); } + + @Override + public String toString() { + return getClass().getSimpleName() + " ID: " + id + " HOST: " + host + " META: " + meta + + " SCHED_META: " + schedulerMeta + " PORTS: " + allPorts; + } - private void setAllPorts(Collection<Number> allPorts) { + private void setAllPorts(Collection<? extends Number> allPorts) { this.allPorts = new HashSet<>(); - if(allPorts!=null) { - for(Number n: allPorts) { + if (allPorts!=null) { + for (Number n: allPorts) { this.allPorts.add(n.intValue()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java b/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java index 3c6e961..353f679 100644 --- a/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java +++ b/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java @@ -153,12 +153,20 @@ public class AuthUtils { * @param storm_conf storm configuration * @return the plugin */ - public static IPrincipalToLocal GetPrincipalToLocalPlugin(Map storm_conf) { - IPrincipalToLocal ptol; + public static IPrincipalToLocal GetPrincipalToLocalPlugin(Map<String, Object> storm_conf) { + IPrincipalToLocal ptol = null; try { String ptol_klassName = (String) storm_conf.get(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN); - ptol = Utils.newInstance(ptol_klassName); - ptol.prepare(storm_conf); + if (ptol_klassName == null) { + LOG.warn("No principal to local given {}", Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN); + } else { + ptol = Utils.newInstance(ptol_klassName); + //TODO this can only ever be null if someone is doing something odd with mocking + // We should really fix the mocking and remove this + if (ptol != null) { + ptol.prepare(storm_conf); + } + } } catch (Exception e) { throw new RuntimeException(e); } @@ -167,15 +175,21 @@ public class AuthUtils { /** * Construct a group mapping service provider plugin - * @param storm_conf storm configuration + * @param conf daemon configuration * @return the plugin */ - public static IGroupMappingServiceProvider GetGroupMappingServiceProviderPlugin(Map storm_conf) { - IGroupMappingServiceProvider gmsp; + public static IGroupMappingServiceProvider GetGroupMappingServiceProviderPlugin(Map conf) { + IGroupMappingServiceProvider gmsp = null; try { - String gmsp_klassName = (String) storm_conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN); - gmsp = Utils.newInstance(gmsp_klassName); - gmsp.prepare(storm_conf); + String gmsp_klassName = (String) conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN); + if (gmsp_klassName == null) { + LOG.warn("No group mapper given {}", Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN); + } else { + gmsp = Utils.newInstance(gmsp_klassName); + if (gmsp != null) { + gmsp.prepare(conf); + } + } } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/security/auth/ShellBasedGroupsMapping.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/ShellBasedGroupsMapping.java b/storm-core/src/jvm/org/apache/storm/security/auth/ShellBasedGroupsMapping.java index 5d7f702..8ea42d5 100644 --- a/storm-core/src/jvm/org/apache/storm/security/auth/ShellBasedGroupsMapping.java +++ b/storm-core/src/jvm/org/apache/storm/security/auth/ShellBasedGroupsMapping.java @@ -56,12 +56,17 @@ public class ShellBasedGroupsMapping implements */ @Override public Set<String> getGroups(String user) throws IOException { - if(cachedGroups.containsKey(user)) { - return cachedGroups.get(user); + synchronized(this) { + if (cachedGroups.containsKey(user)) { + return cachedGroups.get(user); + } } Set<String> groups = getUnixGroups(user); - if(!groups.isEmpty()) - cachedGroups.put(user,groups); + if(!groups.isEmpty()) { + synchronized (this) { + cachedGroups.put(user,groups); + } + } return groups; } http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java index 0524dc8..bd6f32d 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java +++ b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java @@ -36,6 +36,7 @@ import org.apache.storm.generated.ExecutorSpecificStats; import org.apache.storm.generated.ExecutorStats; import org.apache.storm.generated.ExecutorSummary; import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.NodeInfo; import org.apache.storm.generated.SpecificAggregateStats; import org.apache.storm.generated.SpoutAggregateStats; import org.apache.storm.generated.SpoutStats; @@ -51,11 +52,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; @SuppressWarnings("unchecked") @@ -1353,7 +1356,7 @@ public class StatsUtil { } return new ArrayList<WorkerSummary>(workerSummaryMap.values()); } - + /** * Aggregate statistics per worker for a topology. Optionally filtering on specific supervisors * @@ -1505,19 +1508,18 @@ public class StatsUtil { * @return a list of host+port */ public static List<Map<String, Object>> extractNodeInfosFromHbForComp( - Map exec2hostPort, Map task2component, boolean includeSys, String compId) { + Map<List<? extends Number>, List<Object>> exec2hostPort, Map<Integer, String> task2component, boolean includeSys, String compId) { List<Map<String, Object>> ret = new ArrayList<>(); Set<List> hostPorts = new HashSet<>(); - for (Object o : exec2hostPort.entrySet()) { - Map.Entry entry = (Map.Entry) o; - List key = (List) entry.getKey(); - List value = (List) entry.getValue(); + for (Entry<List<? extends Number>, List<Object>> entry : exec2hostPort.entrySet()) { + List<? extends Number> key = entry.getKey(); + List<Object> value = entry.getValue(); - Integer start = ((Number) key.get(0)).intValue(); + Integer start = key.get(0).intValue(); String host = (String) value.get(0); Integer port = (Integer) value.get(1); - String comp = (String) task2component.get(start); + String comp = task2component.get(start); if ((compId == null || compId.equals(comp)) && (includeSys || !Utils.isSystemId(comp))) { hostPorts.add(Lists.newArrayList(host, port)); } @@ -1548,10 +1550,10 @@ public class StatsUtil { * @param timeout timeout * @return a HashMap of updated executor heart beats */ - public static Map<List<Integer>, Object> updateHeartbeatCache(Map<List<Integer>, Map<String, Object>> cache, + public static Map<List<Integer>, Map<String, Object>> updateHeartbeatCache(Map<List<Integer>, Map<String, Object>> cache, Map<List<Integer>, Map<String, Object>> executorBeats, Set<List<Integer>> executors, Integer timeout) { - Map<List<Integer>, Object> ret = new HashMap<>(); + Map<List<Integer>, Map<String, Object>> ret = new HashMap<>(); if (cache == null && executorBeats == null) { return ret; } http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/utils/BufferInputStream.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/BufferInputStream.java b/storm-core/src/jvm/org/apache/storm/utils/BufferInputStream.java index 7420b85..40dd3b7 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/BufferInputStream.java +++ b/storm-core/src/jvm/org/apache/storm/utils/BufferInputStream.java @@ -22,7 +22,7 @@ import java.io.InputStream; import java.util.Arrays; -public class BufferInputStream { +public class BufferInputStream implements AutoCloseable { byte[] buffer; InputStream stream; @@ -47,6 +47,7 @@ public class BufferInputStream { } } + @Override public void close() throws IOException { stream.close(); } http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java index e2be8a7..1c5dca4 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java @@ -168,8 +168,8 @@ public class ConfigUtils { return conf; } - public static Map readYamlConfig(String name, boolean mustExist) { - Map conf = Utils.findAndReadConfigFile(name, mustExist); + public static Map<String, Object> readYamlConfig(String name, boolean mustExist) { + Map<String, Object> conf = Utils.findAndReadConfigFile(name, mustExist); ConfigValidation.validateFields(conf); return conf; } http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/src/jvm/org/apache/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java index 8577c22..9d3b1d4 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java @@ -1665,16 +1665,18 @@ public class Utils { /** * Creates a new map with a string value in the map replaced with an - * equivalently-lengthed string of '#'. + * equivalently-lengthed string of '#'. (If the object is not a string + * to string will be called on it and replaced) * @param m The map that a value will be redacted from * @param key The key pointing to the value to be redacted * @return a new map with the value redacted. The original map will not be modified. */ - public static Map<Object, String> redactValue(Map<Object, String> m, Object key) { - if(m.containsKey(key)) { - HashMap<Object, String> newMap = new HashMap<>(m); - String value = newMap.get(key); - String redacted = new String(new char[value.length()]).replace("\0", "#"); + public static Map<String, Object> redactValue(Map<String, Object> m, String key) { + if (m.containsKey(key)) { + HashMap<String, Object> newMap = new HashMap<>(m); + Object value = newMap.get(key); + String v = value.toString(); + String redacted = new String(new char[v.length()]).replace("\0", "#"); newMap.put(key, redacted); return newMap; } @@ -1962,7 +1964,7 @@ public class Utils { return dir + FILE_PATH_SEPARATOR + "launch_container.sh"; } - public static Object nullToZero (Object v) { + public static double nullToZero (Double v) { return (v != null ? v : 0); } @@ -2071,6 +2073,16 @@ public class Utils { } /** + * a or b the first one that is not null + * @param a something + * @param b something else + * @return a or b the first one that is not null + */ + public static <V> V OR(V a, V b) { + return a == null ? b : a; + } + + /** * Writes a posix shell script file to be executed in its own process. * @param dir the directory under which the script is to be written * @param command the command the script is to execute http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/test/clj/org/apache/storm/cluster_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj index 13198fa..2be4a7a 100644 --- a/storm-core/test/clj/org/apache/storm/cluster_test.clj +++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj @@ -17,7 +17,7 @@ (:import [java.util Arrays] [org.apache.storm.nimbus NimbusInfo]) (:import [org.apache.storm.daemon.common Assignment StormBase SupervisorInfo]) - (:import [org.apache.storm.generated NimbusSummary]) + (:import [org.apache.storm.generated NimbusSummary TopologyStatus]) (:import [org.apache.zookeeper ZooDefs ZooDefs$Ids Watcher$Event$EventType]) (:import [org.mockito Mockito]) (:import [org.mockito.exceptions.base MockitoAssertionError]) @@ -187,8 +187,8 @@ nimbusInfo2 (NimbusInfo. "nimbus2" 6667 false) nimbusSummary1 (NimbusSummary. "nimbus1" 6667 (Time/currentTimeSecs) false "v1") nimbusSummary2 (NimbusSummary. "nimbus2" 6667 (Time/currentTimeSecs) false "v2") - base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "" nil nil {}) - base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "" nil nil {})] + base1 (StormBase. "/tmp/storm1" 1 {:type TopologyStatus/ACTIVE} 2 {} "" nil nil {}) + base2 (StormBase. "/tmp/storm2" 2 {:type TopologyStatus/ACTIVE} 2 {} "" nil nil {})] (is (= [] (.assignments state nil))) (.setAssignment state "storm1" (thriftify-assignment assignment1)) (is (= assignment1 (clojurify-assignment (.assignmentInfo state "storm1" nil)))) @@ -336,4 +336,4 @@ cluster-utils (Mockito/mock ClusterUtils)] (with-open [mocked-cluster (MockedCluster. cluster-utils)] (. (Mockito/when (.mkStateStorageImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/eq nil) (Mockito/any))) (thenReturn distributed-state-storage)) - (ClusterUtils/mkStormClusterState {} nil (ClusterStateContext.)))))) \ No newline at end of file + (ClusterUtils/mkStormClusterState {} nil (ClusterStateContext.)))))) http://git-wip-us.apache.org/repos/asf/storm/blob/725003cc/storm-core/test/clj/org/apache/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj index a43ad8e..cc760a4 100644 --- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj +++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj @@ -16,14 +16,15 @@ (ns org.apache.storm.nimbus-test (:use [clojure test]) (:require [org.apache.storm [util :as util]]) - (:require [org.apache.storm.daemon [nimbus :as nimbus]]) - (:require [org.apache.storm [converter :as converter]]) (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout TestPlannerBolt] + [org.apache.storm.blobstore BlobStore] [org.apache.storm.nimbus InMemoryTopologyActionNotifier] - [org.apache.storm.generated GlobalStreamId] + [org.apache.storm.daemon.nimbus Nimbus Nimbus$StandaloneINimbus] + [org.apache.storm.generated GlobalStreamId TopologyStatus SupervisorInfo StormTopology StormBase] [org.apache.storm Thrift MockAutoCred] - [org.apache.storm.stats BoltExecutorStats StatsUtil]) + [org.apache.storm.stats BoltExecutorStats StatsUtil] + [org.apache.storm.security.auth IGroupMappingServiceProvider IAuthorizer]) (:import [org.apache.storm.testing.staticmocking MockedZookeeper]) (:import [org.apache.storm.scheduler INimbus]) (:import [org.mockito Mockito Matchers]) @@ -34,8 +35,9 @@ TopologyInitialStatus TopologyStatus AlreadyAliveException KillOptions RebalanceOptions InvalidTopologyException AuthorizationException LogConfig LogLevel LogLevelAction Assignment NodeInfo]) - (:import [java.util HashMap]) + (:import [java.util HashMap HashSet Optional]) (:import [java.io File]) + (:import [javax.security.auth Subject]) (:import [org.apache.storm.utils Time Utils Utils$UptimeComputer ConfigUtils IPredicate StormCommonInstaller] [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller]) (:import [org.apache.storm.zookeeper Zookeeper]) @@ -44,7 +46,7 @@ (:import [org.apache.storm.daemon StormCommon]) (:import [org.apache.storm.cluster IStormClusterState StormClusterStateImpl ClusterStateContext ClusterUtils]) (:use [org.apache.storm testing util config log converter]) - (:require [conjure.core] [org.apache.storm.daemon.nimbus :as nimbus]) + (:require [conjure.core]) (:use [conjure core])) @@ -277,7 +279,7 @@ ))) (defn isolation-nimbus [] - (let [standalone (nimbus/standalone-nimbus)] + (let [standalone (Nimbus$StandaloneINimbus.)] (reify INimbus (prepare [this conf local-dir] (.prepare standalone conf local-dir) @@ -499,14 +501,15 @@ ))) (deftest test-topo-history - (with-simulated-time-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5 - :daemon-conf {SUPERVISOR-ENABLE false - NIMBUS-ADMINS ["admin-user"] - NIMBUS-TASK-TIMEOUT-SECS 30 - NIMBUS-MONITOR-FREQ-SECS 10 - TOPOLOGY-ACKER-EXECUTORS 0}] - - (stubbing [nimbus/user-groups ["alice-group"]] + (let [group-mapper (Mockito/mock IGroupMappingServiceProvider)] + (with-simulated-time-local-cluster [cluster :supervisors 2 :ports-per-supervisor 5 + :group-mapper group-mapper + :daemon-conf {SUPERVISOR-ENABLE false + NIMBUS-ADMINS ["admin-user"] + NIMBUS-TASK-TIMEOUT-SECS 30 + NIMBUS-MONITOR-FREQ-SECS 10 + TOPOLOGY-ACKER-EXECUTORS 0}] + (.thenReturn (Mockito/when (.getGroups group-mapper (Mockito/anyObject))) #{"alice-group"}) (letlocals (bind conf (:daemon-conf cluster)) (bind topology (Thrift/buildTopology @@ -521,7 +524,7 @@ (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id nil)))) (.killTopology (:nimbus cluster) "test") ;; check that storm is deactivated but alive - (is (= :killed (-> (clojurify-storm-base (.stormBase state storm-id nil)) :status :type))) + (is (= TopologyStatus/KILLED (-> (clojurify-storm-base (.stormBase state storm-id nil)) :status :type))) (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id nil)))) (advance-cluster-time cluster 35) ;; kill topology read on group @@ -532,7 +535,7 @@ (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id-killgroup nil)))) (.killTopology (:nimbus cluster) "killgrouptest") ;; check that storm is deactivated but alive - (is (= :killed (-> (clojurify-storm-base (.stormBase state storm-id-killgroup nil)) :status :type))) + (is (= TopologyStatus/KILLED (-> (clojurify-storm-base (.stormBase state storm-id-killgroup nil)) :status :type))) (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id-killgroup nil)))) (advance-cluster-time cluster 35) ;; kill topology can't read @@ -543,7 +546,7 @@ (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id-killnoread nil)))) (.killTopology (:nimbus cluster) "killnoreadtest") ;; check that storm is deactivated but alive - (is (= :killed (-> (clojurify-storm-base (.stormBase state storm-id-killnoread nil)) :status :type))) + (is (= TopologyStatus/KILLED (-> (clojurify-storm-base (.stormBase state storm-id-killnoread nil)) :status :type))) (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id-killnoread nil)))) (advance-cluster-time cluster 35) @@ -617,7 +620,7 @@ (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id nil)))) (.killTopology (:nimbus cluster) "test") ;; check that storm is deactivated but alive - (is (= :killed (-> (clojurify-storm-base (.stormBase state storm-id nil)) :status :type))) + (is (= TopologyStatus/KILLED (-> (clojurify-storm-base (.stormBase state storm-id nil)) :status :type))) (is (not-nil? (clojurify-assignment (.assignmentInfo state storm-id nil)))) (advance-cluster-time cluster 18) ;; check that storm is deactivated but alive @@ -961,7 +964,7 @@ (is (thrown? InvalidTopologyException (.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.) - (.set_num_executors {"1" 0}) + (.set_num_executors {"1" (int 0)}) )))) ))) @@ -993,7 +996,7 @@ (.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.) - (.set_num_workers 6) + (.set_num_workers (int 6)) )) (advance-cluster-time cluster 29) (checker [2 2 2]) @@ -1002,7 +1005,7 @@ (.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.) - (.set_num_executors {"1" 1}) + (.set_num_executors {"1" (int 1)}) )) (advance-cluster-time cluster 29) (checker [1 1 1 1 1 1]) @@ -1011,7 +1014,7 @@ (.rebalance (:nimbus cluster) "test" (doto (RebalanceOptions.) - (.set_num_executors {"1" 8}) + (.set_num_executors {"1" (int 8)}) (.set_num_workers 4) )) (advance-cluster-time cluster 32) @@ -1137,24 +1140,6 @@ {TOPOLOGY-WORKERS 8} topology)))))) -(defnk mock-leader-elector [:is-leader true :leader-name "test-host" :leader-port 9999] - (let [leader-address (NimbusInfo. leader-name leader-port true)] - (reify ILeaderElector - (prepare [this conf] true) - (isLeader [this] is-leader) - (addToLeaderLockQueue [this] true) - (getLeader [this] leader-address) - (getAllNimbuses [this] `(leader-address)) - (close [this] true)))) - -;(deftest test-no-overlapping-slots -; ;; test that same node+port never appears across 2 assignments -; ) - -;(deftest test-stateless -; ;; test that nimbus can die and restart without any problems -; ) - (deftest test-clean-inbox "Tests that the inbox correctly cleans jar files." (with-simulated-time @@ -1176,15 +1161,15 @@ (doseq [fs [["a.jar" 20] ["b.jar" 20] ["c.jar" 0]]] (apply mk-file fs)) (assert-files-in-dir ["a.jar" "b.jar" "c.jar"]) - (nimbus/clean-inbox dir-location 10) + (Nimbus/cleanInbox dir-location 10) (assert-files-in-dir ["c.jar"]) ;; Cleanit again, c.jar should stay (advance-time-secs! 5) - (nimbus/clean-inbox dir-location 10) + (Nimbus/cleanInbox dir-location 10) (assert-files-in-dir ["c.jar"]) ;; Advance time, clean again, c.jar should be deleted. (advance-time-secs! 5) - (nimbus/clean-inbox dir-location 10) + (Nimbus/cleanInbox dir-location 10) (assert-files-in-dir []) )))) @@ -1209,7 +1194,8 @@ STORM-ZOOKEEPER-PORT zk-port STORM-LOCAL-DIR nimbus-dir})) (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.))) - (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) + (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil)) + (.launchServer nimbus) (bind topology (Thrift/buildTopology {"1" (Thrift/prepareSpoutDetails (TestPlannerSpout. true) (Integer. 3))} @@ -1220,7 +1206,8 @@ (letlocals (bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.))) - (bind non-leader-nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) + (bind non-leader-nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil)) + (.launchServer non-leader-nimbus) ;first we verify that the master nimbus can perform all actions, even with another nimbus present. (submit-local-topology nimbus "t1" {} topology) @@ -1274,136 +1261,122 @@ ) (deftest test-nimbus-iface-methods-check-authorization - (with-local-cluster [cluster - :daemon-conf {NIMBUS-AUTHORIZER - "org.apache.storm.security.auth.authorizer.DenyAuthorizer"}] - (let [ - nimbus (:nimbus cluster) - topology (Thrift/buildTopology {} {}) - ] - ; Fake good authorization as part of setup. - (mocking [nimbus/check-authorization!] - (submit-local-topology-with-opts nimbus "test" {} topology - (SubmitOptions. TopologyInitialStatus/INACTIVE)) - ) - (stubbing [nimbus/storm-active? true] + (let [cluster-state (Mockito/mock IStormClusterState) + blob-store (Mockito/mock BlobStore)] + (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store + :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"}] + (let [nimbus (:nimbus cluster) + topology-name "test" + topology-id "test-id"] + (.thenReturn (Mockito/when (.getTopoId cluster-state topology-name)) (Optional/of topology-id)) (is (thrown? AuthorizationException - (.rebalance nimbus "test" (RebalanceOptions.)) - )) - ) - (is (thrown? AuthorizationException - (.activate nimbus "test") - )) - (is (thrown? AuthorizationException - (.deactivate nimbus "test") - )) - ) - ) -) + (.rebalance nimbus topology-name (RebalanceOptions.)))) + (is (thrown? AuthorizationException + (.activate nimbus topology-name))) + (is (thrown? AuthorizationException + (.deactivate nimbus topology-name))))))) (deftest test-nimbus-check-authorization-params - (with-local-cluster [cluster + (let [cluster-state (Mockito/mock IStormClusterState) + blob-store (Mockito/mock BlobStore) + mk-nimbus (fn + [conf inimbus blob-store leader-elector group-mapper cluster-state] + (Mockito/spy (mk-nimbus conf inimbus blob-store leader-elector group-mapper cluster-state)))] + (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store :mk-nimbus mk-nimbus :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"}] (let [nimbus (:nimbus cluster) topology-name "test-nimbus-check-autho-params" - topology (Thrift/buildTopology {} {})] - - (submit-local-topology-with-opts nimbus topology-name {} topology - (SubmitOptions. TopologyInitialStatus/INACTIVE)) - - (let [expected-name topology-name - expected-conf {TOPOLOGY-NAME expected-name - "foo" "bar"}] - - (testing "getTopologyConf calls check-authorization! with the correct parameters." - (let [expected-operation "getTopologyConf" - expected-conf-json (JSONValue/toJSONString expected-conf)] - (stubbing [nimbus/check-authorization! nil - nimbus/try-read-storm-conf expected-conf] - (try - (is (= expected-conf - (->> (.getTopologyConf nimbus "fake-id") - JSONValue/parse - clojurify-structure))) - (catch NotAliveException e) - (finally - (verify-first-call-args-for-indices - nimbus/check-authorization! - [1 2 3] expected-name expected-conf expected-operation)))))) - - (testing "getTopology calls check-authorization! with the correct parameters." - (let [expected-operation "getTopology" - common-spy (->> - (proxy [StormCommon] [] - (systemTopologyImpl [conf topology] nil)) - Mockito/spy)] - (with-open [- (StormCommonInstaller. common-spy)] - (stubbing [nimbus/check-authorization! nil - nimbus/try-read-storm-conf expected-conf - nimbus/try-read-storm-topology nil] - (try - (.getTopology nimbus "fake-id") - (catch NotAliveException e) - (finally - (verify-first-call-args-for-indices - nimbus/check-authorization! - [1 2 3] expected-name expected-conf expected-operation) - (. (Mockito/verify common-spy) - (systemTopologyImpl (Matchers/eq expected-conf) - (Matchers/any))))))))) - - (testing "getUserTopology calls check-authorization with the correct parameters." - (let [expected-operation "getUserTopology"] - (stubbing [nimbus/check-authorization! nil - nimbus/try-read-storm-conf expected-conf - nimbus/try-read-storm-topology nil] - (try - (.getUserTopology nimbus "fake-id") - (catch NotAliveException e) - (finally - (verify-first-call-args-for-indices - nimbus/check-authorization! - [1 2 3] expected-name expected-conf expected-operation) - (verify-first-call-args-for-indices - nimbus/try-read-storm-topology [0] "fake-id")))))) - - (testing "getSupervisorPageInfo only calls check-authorization as getTopology" - (let [expected-operation "getTopology" - assignment (doto (Assignment.) - (.set_executor_node_port {[1 1] (NodeInfo. "super1" #{1}), - [2 2] (NodeInfo. "super2" #{2})})) - clojurified-assignment (clojurify-assignment assignment) - topo-assignment {expected-name assignment} - check-auth-state (atom []) - mock-check-authorization (fn [nimbus storm-name storm-conf operation] - (swap! check-auth-state conj {:nimbus nimbus - :storm-name storm-name - :storm-conf storm-conf - :operation operation}))] - (stubbing [nimbus/check-authorization! mock-check-authorization - nimbus/try-read-storm-conf expected-conf - nimbus/try-read-storm-topology nil - nimbus/get-clojurified-task-info {} - nimbus/all-supervisor-info {"super1" {:hostname "host1", :meta [1234], :uptime-secs 123} - "super2" {:hostname "host2", :meta [1234], :uptime-secs 123}} - clojurify-assignment clojurified-assignment - nimbus/topology-assignments topo-assignment - nimbus/get-launch-time-secs 0] - ;; not called yet - (verify-call-times-for nimbus/check-authorization! 0) - (.getSupervisorPageInfo nimbus "super1" nil true) - - ;; afterwards, it should get called twice - (verify-call-times-for nimbus/check-authorization! 2) - (let [first-call (nth @check-auth-state 0) - second-call (nth @check-auth-state 1)] - (is (= expected-name (:storm-name first-call))) - (is (= expected-conf (:storm-conf first-call))) - (is (= "getTopology" (:operation first-call))) + topology-id "fake-id" + topology (Thrift/buildTopology {} {}) + expected-name topology-name + expected-conf {TOPOLOGY-NAME expected-name + "foo" "bar"}] + (.thenReturn (Mockito/when (.getTopoId cluster-state topology-name)) (Optional/of topology-id)) + (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) expected-conf) + (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/anyObject))) nil) + (testing "getTopologyConf calls check-authorization! with the correct parameters." + (let [expected-operation "getTopologyConf" + expected-conf-json (JSONValue/toJSONString expected-conf)] + (try + (is (= expected-conf + (->> (.getTopologyConf nimbus topology-id) + JSONValue/parse + clojurify-structure))) + (catch NotAliveException e) + (finally + (.checkAuthorization (Mockito/verify nimbus) topology-name expected-conf expected-operation))))) + + (testing "getTopology calls check-authorization! with the correct parameters." + (let [expected-operation "getTopology" + common-spy (->> + (proxy [StormCommon] [] + (systemTopologyImpl [conf topology] nil)) + Mockito/spy)] + (with-open [- (StormCommonInstaller. common-spy)] + (try + (.getTopology nimbus topology-id) + (catch NotAliveException e) + (finally + (.checkAuthorization (Mockito/verify nimbus) topology-name expected-conf expected-operation) + (. (Mockito/verify common-spy) + (systemTopologyImpl (Matchers/eq expected-conf) + (Matchers/any)))))))) + + (testing "getUserTopology calls check-authorization with the correct parameters." + (let [expected-operation "getUserTopology"] + (try + (.getUserTopology nimbus topology-id) + (catch NotAliveException e) + (finally + (.checkAuthorization (Mockito/verify nimbus) topology-name expected-conf expected-operation) + ;;One for this time and one for getTopology call + (.readTopology (Mockito/verify blob-store (Mockito/times 2)) (Mockito/eq topology-id) (Mockito/anyObject)))))))))) + +(deftest test-check-authorization-getSupervisorPageInfo + (let [cluster-state (Mockito/mock IStormClusterState) + blob-store (Mockito/mock BlobStore) + mk-nimbus (fn + [conf inimbus blob-store leader-elector group-mapper cluster-state] + (Mockito/spy (mk-nimbus conf inimbus blob-store leader-elector group-mapper cluster-state)))] + (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store :mk-nimbus mk-nimbus + :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"}] + (let [nimbus (:nimbus cluster) + expected-name "test-nimbus-check-autho-params" + expected-conf {TOPOLOGY-NAME expected-name + TOPOLOGY-WORKERS 1 + TOPOLOGY-MESSAGE-TIMEOUT-SECS 30 + "foo" "bar"} + expected-operation "getTopology" + assignment (doto (Assignment.) + (.set_executor_node_port {[1 1] (NodeInfo. "super1" #{1}), + [2 2] (NodeInfo. "super2" #{2})})) + topology (doto (StormTopology. ) + (.set_spouts {}) + (.set_bolts {}) + (.set_state_spouts {})) + clojurified-assignment (clojurify-assignment assignment) + topo-assignment {expected-name assignment} + check-auth-state (atom []) + mock-check-authorization (fn [nimbus storm-name storm-conf operation] + (swap! check-auth-state conj {:nimbus nimbus + :storm-name storm-name + :storm-conf storm-conf + :operation operation})) + all-supervisors (doto (HashMap.) + (.put "super1" (doto (SupervisorInfo.) (.set_hostname "host1") (.set_meta [(long 1234)]) + (.set_uptime_secs (long 123)) (.set_meta [1 2 3]) (.set_used_ports []) (.set_resources_map {}))) + (.put "super2" (doto (SupervisorInfo.) (.set_hostname "host2") (.set_meta [(long 1234)]) + (.set_uptime_secs (long 123)) (.set_meta [1 2 3]) (.set_used_ports []) (.set_resources_map {}))))] + (.thenReturn (Mockito/when (.allSupervisorInfo cluster-state)) all-supervisors) + (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/any Subject))) expected-conf) + (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/any Subject))) topology) + (.thenReturn (Mockito/when (.topologyAssignments cluster-state)) topo-assignment) + (stubbing [clojurify-assignment clojurified-assignment] + (.getSupervisorPageInfo nimbus "super1" nil true) - (is (= expected-name (:storm-name second-call))) - (is (= expected-conf (:storm-conf second-call))) - (is (= "getSupervisorPageInfo" (:operation second-call))))))))))) + ;; afterwards, it should get called twice + (.checkAuthorization (Mockito/verify (:nimbus cluster)) expected-name expected-conf "getSupervisorPageInfo") + (.checkAuthorization (Mockito/verify (:nimbus cluster)) expected-name expected-conf "getTopology")))))) (deftest test-nimbus-iface-getTopology-methods-throw-correctly (with-local-cluster [cluster] @@ -1444,52 +1417,56 @@ ) (deftest test-nimbus-iface-getClusterInfo-filters-topos-without-bases - (with-local-cluster [cluster] - (let [ - nimbus (:nimbus cluster) + (let [cluster-state (Mockito/mock IStormClusterState) + blob-store (Mockito/mock BlobStore)] + (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store] + (let [nimbus (:nimbus cluster) bogus-secs 42 - bogus-type "bogusType" + bogus-type TopologyStatus/ACTIVE bogus-bases { "1" nil - "2" {:launch-time-secs bogus-secs + "2" (thriftify-storm-base {:launch-time-secs bogus-secs :storm-name "id2-name" - :status {:type bogus-type}} + :status {:type bogus-type}}) "3" nil - "4" {:launch-time-secs bogus-secs + "4" (thriftify-storm-base {:launch-time-secs bogus-secs :storm-name "id4-name" - :status {:type bogus-type}} + :status {:type bogus-type}}) } + topo-name "test-topo" + topo-conf {TOPOLOGY-NAME topo-name + TOPOLOGY-WORKERS 1 + TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} + storm-base (StormBase. ) + topology (doto (StormTopology. ) + (.set_spouts {}) + (.set_bolts {}) + (.set_state_spouts {})) ] - (stubbing [nimbus/get-resources-for-topology nil - nimbus/nimbus-topology-bases bogus-bases - nimbus/get-blob-replication-count 1] - (let [topos (.get_topologies (.getClusterInfo nimbus))] - ; The number of topologies in the summary is correct. - (is (= (count - (filter (fn [b] (second b)) bogus-bases)) (count topos))) - ; Each topology present has a valid name. - (is (empty? - (filter (fn [t] (or (nil? t) (nil? (.get_name t)))) topos))) - ; The topologies are those with valid bases. - (is (empty? - (filter (fn [t] - (or - (nil? t) - (not (number? (read-string (.get_id t)))) - (odd? (read-string (.get_id t))) - )) topos))) - ) + (.thenReturn (Mockito/when (.stormBase cluster-state (Mockito/any String) (Mockito/anyObject))) storm-base) + (.thenReturn (Mockito/when (.topologyBases cluster-state)) bogus-bases) + (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/any Subject))) topo-conf) + (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/any Subject))) topology) + + (let [topos (.get_topologies (.getClusterInfo nimbus))] + ; The number of topologies in the summary is correct. + (is (= (count + (filter (fn [b] (second b)) bogus-bases)) (count topos))) + ; Each topology present has a valid name. + (is (empty? + (filter (fn [t] (or (nil? t) (nil? (.get_name t)))) topos))) + ; The topologies are those with valid bases. + (is (empty? + (filter (fn [t] + (or + (nil? t) + (not (number? (read-string (.get_id t)))) + (odd? (read-string (.get_id t))) + )) topos))) ) ) ) -) - -(deftest test-defserverfn-numbus-iface-instance - (test-nimbus-iface-submitTopologyWithOpts-checks-authorization) - (test-nimbus-iface-methods-check-authorization) - (test-nimbus-iface-getTopology-methods-throw-correctly) - (test-nimbus-iface-getClusterInfo-filters-topos-without-bases) -) +)) (deftest test-nimbus-data-acls (testing "nimbus-data uses correct ACLs" @@ -1500,8 +1477,8 @@ STORM-ZOOKEEPER-AUTH-PAYLOAD digest STORM-PRINCIPAL-TO-LOCAL-PLUGIN "org.apache.storm.security.auth.DefaultPrincipalToLocal" NIMBUS-THRIFT-PORT 6666}) - expected-acls nimbus/NIMBUS-ZK-ACLS - fake-inimbus (reify INimbus (getForcedScheduler [this] nil)) + expected-acls Nimbus/ZK_ACLS + fake-inimbus (reify INimbus (getForcedScheduler [this] nil) (prepare [this conf dir] nil)) fake-cu (proxy [ConfigUtils] [] (nimbusTopoHistoryStateImpl [conf] nil)) fake-utils (proxy [Utils] [] @@ -1517,13 +1494,9 @@ zk-le (MockedZookeeper. (proxy [Zookeeper] [] (zkLeaderElectorImpl [conf blob-store] nil))) mocked-cluster (MockedCluster. cluster-utils)] - (stubbing [nimbus/file-cache-map nil - nimbus/mk-blob-cache-map nil - nimbus/mk-bloblist-cache-map nil - nimbus/mk-scheduler nil] - (nimbus/nimbus-data auth-conf fake-inimbus) + (Nimbus. auth-conf fake-inimbus) (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any)) - ))))) + )))) (deftest test-file-bogus-download (with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}] @@ -1534,12 +1507,14 @@ ))) (deftest test-validate-topo-config-on-submit - (with-local-cluster [cluster] - (let [nimbus (:nimbus cluster) - topology (Thrift/buildTopology {} {}) - bad-config {"topology.isolate.machines" "2"}] - ; Fake good authorization as part of setup. - (mocking [nimbus/check-authorization!] + (let [cluster-state (Mockito/mock IStormClusterState) + blob-store (Mockito/mock BlobStore)] + (.thenReturn (Mockito/when (.getTopoId cluster-state "test")) (Optional/empty)) + (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store + :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"}] + (let [nimbus (:nimbus cluster) + topology (Thrift/buildTopology {} {}) + bad-config {"topology.isolate.machines" "2"}] (is (thrown-cause? InvalidTopologyException (submit-local-topology-with-opts nimbus "test" bad-config topology (SubmitOptions.)))))))) @@ -1555,7 +1530,8 @@ STORM-ZOOKEEPER-PORT zk-port STORM-LOCAL-DIR nimbus-dir})) (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.))) - (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) + (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil)) + (.launchServer nimbus) (Time/sleepSecs 1) (bind topology (Thrift/buildTopology {"1" (Thrift/prepareSpoutDetails @@ -1570,7 +1546,8 @@ ; in startup of nimbus it reads cluster state and take proper actions ; in this case nimbus registers topology transition event to scheduler again ; before applying STORM-856 nimbus was killed with NPE - (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) + (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil)) + (.launchServer nimbus) (.shutdown nimbus) (.disconnect cluster-state) )))) @@ -1588,7 +1565,8 @@ STORM-LOCAL-DIR nimbus-dir NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN (.getName InMemoryTopologyActionNotifier)})) (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.))) - (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) + (bind nimbus (mk-nimbus conf (Nimbus$StandaloneINimbus.) nil nil nil nil)) + (.launchServer nimbus) (bind notifier (InMemoryTopologyActionNotifier.)) (Time/sleepSecs 1) (bind topology (Thrift/buildTopology @@ -1638,55 +1616,73 @@ ;; if the user sends an empty log config, nimbus will say that all ;; log configs it contains are LogLevelAction/UNCHANGED (deftest empty-save-config-results-in-all-unchanged-actions - (with-local-cluster [cluster] - (let [nimbus (:nimbus cluster) - previous-config (LogConfig.) - level (LogLevel.) - mock-config (LogConfig.)] - ;; send something with content to nimbus beforehand - (.set_target_log_level level "ERROR") - (.set_action level LogLevelAction/UPDATE) - (.put_to_named_logger_level previous-config "test" level) - (stubbing [nimbus/check-storm-active! nil - nimbus/try-read-storm-conf {}] - (.setLogConfig nimbus "foo" previous-config) + (let [cluster-state (Mockito/mock IStormClusterState) + blob-store (Mockito/mock BlobStore)] + (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store + :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"}] + (let [nimbus (:nimbus cluster) + previous-config (LogConfig.) + mock-config (LogConfig.) + expected-config (LogConfig.)] + ;; send something with content to nimbus beforehand + (.put_to_named_logger_level previous-config "test" + (doto (LogLevel.) + (.set_target_log_level "ERROR") + (.set_action LogLevelAction/UPDATE))) + + (.put_to_named_logger_level expected-config "test" + (doto (LogLevel.) + (.set_target_log_level "ERROR") + (.set_action LogLevelAction/UNCHANGED))) + + (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) {}) + (.thenReturn (Mockito/when (.topologyLogConfig cluster-state (Mockito/any String) (Mockito/anyObject))) previous-config) + (.setLogConfig nimbus "foo" mock-config) - (let [saved-config (.getLogConfig nimbus "foo") - levels (.get_named_logger_level saved-config)] - (is (= (.get_action (.get levels "test")) LogLevelAction/UNCHANGED))))))) + (.setTopologyLogConfig (Mockito/verify cluster-state) (Mockito/any String) (Mockito/eq expected-config)))))) (deftest log-level-update-merges-and-flags-existent-log-level - (with-local-cluster [cluster] - (stubbing [nimbus/check-storm-active! nil - nimbus/try-read-storm-conf {}] + (let [cluster-state (Mockito/mock IStormClusterState) + blob-store (Mockito/mock BlobStore)] + (with-mocked-nimbus [cluster :cluster-state cluster-state :blob-store blob-store + :daemon-conf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"}] (let [nimbus (:nimbus cluster) previous-config (LogConfig.) - level (LogLevel.) - other-level (LogLevel.) - mock-config (LogConfig.)] + mock-config (LogConfig.) + expected-config (LogConfig.)] ;; send something with content to nimbus beforehand - (.set_target_log_level level "ERROR") - (.set_action level LogLevelAction/UPDATE) - (.put_to_named_logger_level previous-config "test" level) + (.put_to_named_logger_level previous-config "test" + (doto (LogLevel.) + (.set_target_log_level "ERROR") + (.set_action LogLevelAction/UPDATE))) + + (.put_to_named_logger_level previous-config "other-test" + (doto (LogLevel.) + (.set_target_log_level "DEBUG") + (.set_action LogLevelAction/UPDATE))) - (.set_target_log_level other-level "DEBUG") - (.set_action other-level LogLevelAction/UPDATE) - (.put_to_named_logger_level previous-config "other-test" other-level) - (.setLogConfig nimbus "foo" previous-config) ;; only change "test" - (.set_target_log_level level "INFO") - (.set_action level LogLevelAction/UPDATE) - (.put_to_named_logger_level mock-config "test" level) + (.put_to_named_logger_level mock-config "test" + (doto (LogLevel.) + (.set_target_log_level "INFO") + (.set_action LogLevelAction/UPDATE))) + + (.put_to_named_logger_level expected-config "test" + (doto (LogLevel.) + (.set_target_log_level "INFO") + (.set_action LogLevelAction/UPDATE))) + + (.put_to_named_logger_level expected-config "other-test" + (doto (LogLevel.) + (.set_target_log_level "DEBUG") + (.set_action LogLevelAction/UNCHANGED))) + + (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) {}) + (.thenReturn (Mockito/when (.topologyLogConfig cluster-state (Mockito/any String) (Mockito/anyObject))) previous-config) + (.setLogConfig nimbus "foo" mock-config) - - (let [saved-config (.getLogConfig nimbus "foo") - levels (.get_named_logger_level saved-config)] - (is (= (.get_action (.get levels "test")) LogLevelAction/UPDATE)) - (is (= (.get_target_log_level (.get levels "test")) "INFO")) - - (is (= (.get_action (.get levels "other-test")) LogLevelAction/UNCHANGED)) - (is (= (.get_target_log_level (.get levels "other-test")) "DEBUG"))))))) + (.setTopologyLogConfig (Mockito/verify cluster-state) (Mockito/any String) (Mockito/eq expected-config)))))) (defn teardown-heartbeats [id]) (defn teardown-topo-errors [id]) @@ -1708,120 +1704,106 @@ (backpressureTopologies [this] bp-topos)))) (deftest cleanup-storm-ids-returns-inactive-topos - (let [mock-state (mock-cluster-state (list "topo1") (list "topo1" "topo2" "topo3"))] - (stubbing [nimbus/is-leader true - nimbus/code-ids {}] - (is (= (nimbus/cleanup-storm-ids mock-state nil) #{"topo2" "topo3"}))))) + (let [mock-state (mock-cluster-state (list "topo1") (list "topo1" "topo2" "topo3")) + store (Mockito/mock BlobStore)] + (.thenReturn (Mockito/when (.storedTopoIds store)) #{}) + (is (= (Nimbus/topoIdsToClean mock-state store) #{"topo2" "topo3"})))) (deftest cleanup-storm-ids-performs-union-of-storm-ids-with-active-znodes (let [active-topos (list "hb1" "e2" "bp3") hb-topos (list "hb1" "hb2" "hb3") error-topos (list "e1" "e2" "e3") bp-topos (list "bp1" "bp2" "bp3") - mock-state (mock-cluster-state active-topos hb-topos error-topos bp-topos)] - (stubbing [nimbus/is-leader true - nimbus/code-ids {}] - (is (= (nimbus/cleanup-storm-ids mock-state nil) - #{"hb2" "hb3" "e1" "e3" "bp1" "bp2"}))))) + mock-state (mock-cluster-state active-topos hb-topos error-topos bp-topos) + store (Mockito/mock BlobStore)] + (.thenReturn (Mockito/when (.storedTopoIds store)) #{}) + (is (= (Nimbus/topoIdsToClean mock-state store) + #{"hb2" "hb3" "e1" "e3" "bp1" "bp2"})))) (deftest cleanup-storm-ids-returns-empty-set-when-all-topos-are-active (let [active-topos (list "hb1" "hb2" "hb3" "e1" "e2" "e3" "bp1" "bp2" "bp3") hb-topos (list "hb1" "hb2" "hb3") error-topos (list "e1" "e2" "e3") bp-topos (list "bp1" "bp2" "bp3") - mock-state (mock-cluster-state active-topos hb-topos error-topos bp-topos)] - (stubbing [nimbus/is-leader true - nimbus/code-ids {}] - (is (= (nimbus/cleanup-storm-ids mock-state nil) - #{}))))) + mock-state (mock-cluster-state active-topos hb-topos error-topos bp-topos) + store (Mockito/mock BlobStore)] + (.thenReturn (Mockito/when (.storedTopoIds store)) #{}) + (is (= (Nimbus/topoIdsToClean mock-state store) + #{})))) (deftest do-cleanup-removes-inactive-znodes (let [inactive-topos (list "topo2" "topo3") - hb-cache (atom (into {}(map vector inactive-topos '(nil nil)))) + hb-cache (into {}(map vector inactive-topos '(nil nil))) mock-state (mock-cluster-state) - mock-blob-store {} - conf {} - nimbus {:conf conf - :submit-lock mock-blob-store - :blob-store {} - :storm-cluster-state mock-state - :heartbeats-cache hb-cache}] - - (stubbing [nimbus/is-leader true - nimbus/blob-rm-topology-keys nil - nimbus/cleanup-storm-ids inactive-topos] - (mocking - [teardown-heartbeats - teardown-topo-errors - teardown-backpressure-dirs - nimbus/force-delete-topo-dist-dir - nimbus/blob-rm-topology-keys - nimbus/blob-rm-dependency-jars-in-topology] - - (nimbus/do-cleanup nimbus) - - ;; removed heartbeats znode - (verify-nth-call-args-for 1 teardown-heartbeats "topo2") - (verify-nth-call-args-for 2 teardown-heartbeats "topo3") - - ;; removed topo errors znode - (verify-nth-call-args-for 1 teardown-topo-errors "topo2") - (verify-nth-call-args-for 2 teardown-topo-errors "topo3") - - ;; removed backpressure znodes - (verify-nth-call-args-for 1 teardown-backpressure-dirs "topo2") - (verify-nth-call-args-for 2 teardown-backpressure-dirs "topo3") - - ;; removed topo directories - (verify-nth-call-args-for 1 nimbus/force-delete-topo-dist-dir conf "topo2") - (verify-nth-call-args-for 2 nimbus/force-delete-topo-dist-dir conf "topo3") - - ;; removed blob store topo keys - (verify-nth-call-args-for 1 nimbus/blob-rm-topology-keys "topo2" mock-blob-store mock-state) - (verify-nth-call-args-for 2 nimbus/blob-rm-topology-keys "topo3" mock-blob-store mock-state) - - ;; removed topology dependencies - (verify-nth-call-args-for 1 nimbus/blob-rm-dependency-jars-in-topology "topo2" mock-blob-store mock-state) - (verify-nth-call-args-for 2 nimbus/blob-rm-dependency-jars-in-topology "topo3" mock-blob-store mock-state) - - ;; remove topos from heartbeat cache - (is (= (count @hb-cache) 0)))))) + mock-blob-store (Mockito/mock BlobStore) + conf {}] + (with-open [_ (MockedZookeeper. (proxy [Zookeeper] [] + (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector))))] + (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil))] + (.set (.getHeartbeatsCache nimbus) hb-cache) + (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (HashSet. inactive-topos)) + (mocking + [teardown-heartbeats + teardown-topo-errors + teardown-backpressure-dirs] + + (.doCleanup nimbus) + + ;; removed heartbeats znode + (verify-nth-call-args-for 1 teardown-heartbeats "topo2") + (verify-nth-call-args-for 2 teardown-heartbeats "topo3") + + ;; removed topo errors znode + (verify-nth-call-args-for 1 teardown-topo-errors "topo2") + (verify-nth-call-args-for 2 teardown-topo-errors "topo3") + + ;; removed backpressure znodes + (verify-nth-call-args-for 1 teardown-backpressure-dirs "topo2") + (verify-nth-call-args-for 2 teardown-backpressure-dirs "topo3") + + ;; removed topo directories + (.forceDeleteTopoDistDir (Mockito/verify nimbus) "topo2") + (.forceDeleteTopoDistDir (Mockito/verify nimbus) "topo3") + + ;; removed blob store topo keys + (.rmTopologyKeys (Mockito/verify nimbus) "topo2") + (.rmTopologyKeys (Mockito/verify nimbus) "topo3") + + ;; removed topology dependencies + (.rmDependencyJarsInTopology (Mockito/verify nimbus) "topo2") + (.rmDependencyJarsInTopology (Mockito/verify nimbus) "topo3") + + ;; remove topos from heartbeat cache + (is (= (count (.get (.getHeartbeatsCache nimbus))) 0))))))) (deftest do-cleanup-does-not-teardown-active-topos (let [inactive-topos () - hb-cache (atom {"topo1" nil "topo2" nil}) + hb-cache {"topo1" nil "topo2" nil} mock-state (mock-cluster-state) - mock-blob-store {} - conf {} - nimbus {:conf conf - :submit-lock mock-blob-store - :blob-store {} - :storm-cluster-state mock-state - :heartbeats-cache hb-cache}] - - (stubbing [nimbus/is-leader true - nimbus/blob-rm-topology-keys nil - nimbus/cleanup-storm-ids inactive-topos] - (mocking - [teardown-heartbeats - teardown-topo-errors - teardown-backpressure-dirs - nimbus/force-delete-topo-dist-dir - nimbus/blob-rm-topology-keys] - - (nimbus/do-cleanup nimbus) - - (verify-call-times-for teardown-heartbeats 0) - (verify-call-times-for teardown-topo-errors 0) - (verify-call-times-for teardown-backpressure-dirs 0) - (verify-call-times-for nimbus/force-delete-topo-dist-dir 0) - (verify-call-times-for nimbus/blob-rm-topology-keys 0) - - ;; hb-cache goes down to 1 because only one topo was inactive - (is (= (count @hb-cache) 2)) - (is (contains? @hb-cache "topo1")) - (is (contains? @hb-cache "topo2")))))) - + mock-blob-store (Mockito/mock BlobStore) + conf {}] + (with-open [_ (MockedZookeeper. (proxy [Zookeeper] [] + (zkLeaderElectorImpl [conf blob-store] (mock-leader-elector))))] + (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil))] + (.set (.getHeartbeatsCache nimbus) hb-cache) + (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (set inactive-topos)) + (mocking + [teardown-heartbeats + teardown-topo-errors + teardown-backpressure-dirs] + + (.doCleanup nimbus) + + (verify-call-times-for teardown-heartbeats 0) + (verify-call-times-for teardown-topo-errors 0) + (verify-call-times-for teardown-backpressure-dirs 0) + (.forceDeleteTopoDistDir (Mockito/verify nimbus (Mockito/times 0)) (Mockito/anyObject)) + (.rmTopologyKeys (Mockito/verify nimbus (Mockito/times 0)) (Mockito/anyObject)) + + ;; hb-cache goes down to 1 because only one topo was inactive + (is (= (count (.get (.getHeartbeatsCache nimbus))) 2)) + (is (contains? (.get (.getHeartbeatsCache nimbus)) "topo1")) + (is (contains? (.get (.getHeartbeatsCache nimbus)) "topo2"))))))) (deftest user-topologies-for-supervisor (let [assignment (doto (Assignment.) @@ -1830,18 +1812,18 @@ assignment2 (doto (Assignment.) (.set_executor_node_port {[1 1] (NodeInfo. "super2" #{2}), [2 2] (NodeInfo. "super2" #{2})})) - assignments {"topo1" assignment, "topo2" assignment2}] - (stubbing [nimbus/is-authorized? true] - (let [topos1 (nimbus/user-and-supervisor-topos nil nil nil assignments "super1") - topos2 (nimbus/user-and-supervisor-topos nil nil nil assignments "super2")] - (is (= (list "topo1") (:supervisor-topologies topos1))) - (is (= #{"topo1"} (:user-topologies topos1))) - (is (= (list "topo1" "topo2") (:supervisor-topologies topos2))) - (is (= #{"topo1" "topo2"} (:user-topologies topos2))))))) - -(defn- mock-check-auth - [nimbus conf blob-store op topo-name] - (= topo-name "authorized")) + assignments {"topo1" assignment, "topo2" assignment2} + mock-state (mock-cluster-state) + mock-blob-store (Mockito/mock BlobStore) + nimbus (Nimbus. {} nil mock-state nil mock-blob-store (mock-leader-elector) nil)] + (let [supervisor1-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super1")) + user1-topologies (clojurify-structure (.filterAuthorized nimbus "getTopology" supervisor1-topologies)) + supervisor2-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super2")) + user2-topologies (clojurify-structure (.filterAuthorized nimbus "getTopology" supervisor2-topologies))] + (is (= (list "topo1") supervisor1-topologies)) + (is (= #{"topo1"} user1-topologies)) + (is (= (list "topo1" "topo2") supervisor2-topologies)) + (is (= #{"topo1" "topo2"} user2-topologies))))) (deftest user-topologies-for-supervisor-with-unauthorized-user (let [assignment (doto (Assignment.) @@ -1850,8 +1832,15 @@ assignment2 (doto (Assignment.) (.set_executor_node_port {[1 1] (NodeInfo. "super1" #{2}), [2 2] (NodeInfo. "super2" #{2})})) - assignments {"topo1" assignment, "authorized" assignment2}] - (stubbing [nimbus/is-authorized? mock-check-auth] - (let [topos (nimbus/user-and-supervisor-topos nil nil nil assignments "super1")] - (is (= (list "topo1" "authorized") (:supervisor-topologies topos))) - (is (= #{"authorized"} (:user-topologies topos))))))) + assignments {"topo1" assignment, "authorized" assignment2} + mock-state (mock-cluster-state) + mock-blob-store (Mockito/mock BlobStore) + nimbus (Nimbus. {} nil mock-state nil mock-blob-store (mock-leader-elector) nil)] + (.thenReturn (Mockito/when (.readTopologyConf mock-blob-store (Mockito/eq "authorized") (Mockito/anyObject))) {TOPOLOGY-NAME "authorized"}) + (.thenReturn (Mockito/when (.readTopologyConf mock-blob-store (Mockito/eq "topo1") (Mockito/anyObject))) {TOPOLOGY-NAME "topo1"}) + (.setAuthorizationHandler nimbus (reify IAuthorizer (permit [this context operation topo-conf] (= "authorized" (get topo-conf TOPOLOGY-NAME))))) + (let [supervisor-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super1")) + user-topologies (clojurify-structure (.filterAuthorized nimbus "getTopology" supervisor-topologies))] + + (is (= (list "topo1" "authorized") supervisor-topologies)) + (is (= #{"authorized"} user-topologies)))))