http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java new file mode 100644 index 0000000..cd2bc4a --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java @@ -0,0 +1,664 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.cluster; + +import clojure.lang.*; +import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.*; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.storm.callback.ZKStateChangedCallback; +import org.apache.storm.generated.*; +import org.apache.storm.nimbus.NimbusInfo; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.zookeeper.Zookeeper; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.security.NoSuchAlgorithmException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +public class StormClusterStateImpl implements StormClusterState { + + private static Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class); + + private StateStorage stateStorage; + + private ConcurrentHashMap<String, IFn> assignmentInfoCallback; + private ConcurrentHashMap<String, IFn> assignmentInfoWithVersionCallback; + private ConcurrentHashMap<String, IFn> assignmentVersionCallback; + private AtomicReference<IFn> supervisorsCallback; + // we want to reigister a topo directory getChildren callback for all workers of this dir + private ConcurrentHashMap<String, IFn> backPressureCallback; + private AtomicReference<IFn> assignmentsCallback; + private ConcurrentHashMap<String, IFn> stormBaseCallback; + private AtomicReference<IFn> blobstoreCallback; + private ConcurrentHashMap<String, IFn> credentialsCallback; + private ConcurrentHashMap<String, IFn> logConfigCallback; + + private List<ACL> acls; + private String stateId; + private boolean solo; + + public StormClusterStateImpl(StateStorage StateStorage, List<ACL> acls, ClusterStateContext context, boolean solo) throws Exception { + + this.stateStorage = StateStorage; + this.solo = solo; + + assignmentInfoCallback = new ConcurrentHashMap<>(); + assignmentInfoWithVersionCallback = new ConcurrentHashMap<>(); + assignmentVersionCallback = new ConcurrentHashMap<>(); + supervisorsCallback = new AtomicReference<>(); + backPressureCallback = new ConcurrentHashMap<>(); + assignmentsCallback = new AtomicReference<>(); + stormBaseCallback = new ConcurrentHashMap<>(); + credentialsCallback = new ConcurrentHashMap<>(); + logConfigCallback = new ConcurrentHashMap<>(); + blobstoreCallback = new AtomicReference<>(); + + stateId = this.stateStorage.register(new ZKStateChangedCallback() { + + public void changed(Watcher.Event.EventType type, String path) { + List<String> toks = Zookeeper.tokenizePath(path); + int size = toks.size(); + if (size >= 1) { + String params = null; + String root = toks.get(0); + IFn fn = null; + if (root.equals(ClusterUtils.ASSIGNMENTS_ROOT)) { + if (size == 1) { + // set null and get the old value + issueCallback(assignmentsCallback); + } else { + issueMapCallback(assignmentInfoCallback, toks.get(1)); + issueMapCallback(assignmentVersionCallback, toks.get(1)); + issueMapCallback(assignmentInfoWithVersionCallback, toks.get(1)); + } + + } else if (root.equals(ClusterUtils.SUPERVISORS_ROOT)) { + issueCallback(supervisorsCallback); + } else if (root.equals(ClusterUtils.BLOBSTORE_ROOT)) { + issueCallback(blobstoreCallback); + } else if (root.equals(ClusterUtils.STORMS_ROOT) && size > 1) { + issueMapCallback(stormBaseCallback, toks.get(1)); + } else if (root.equals(ClusterUtils.CREDENTIALS_ROOT) && size > 1) { + issueMapCallback(credentialsCallback, toks.get(1)); + } else if (root.equals(ClusterUtils.LOGCONFIG_ROOT) && size > 1) { + issueMapCallback(logConfigCallback, toks.get(1)); + } else if (root.equals(ClusterUtils.BACKPRESSURE_ROOT) && size > 1) { + issueMapCallback(logConfigCallback, toks.get(1)); + } else { + LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path); + Runtime.getRuntime().exit(30); + } + + } + + return; + } + + }); + + String[] pathlist = { ClusterUtils.ASSIGNMENTS_SUBTREE, ClusterUtils.STORMS_SUBTREE, ClusterUtils.SUPERVISORS_SUBTREE, ClusterUtils.WORKERBEATS_SUBTREE, + ClusterUtils.ERRORS_SUBTREE, ClusterUtils.BLOBSTORE_SUBTREE, ClusterUtils.NIMBUSES_SUBTREE, ClusterUtils.LOGCONFIG_SUBTREE }; + for (String path : pathlist) { + this.stateStorage.mkdirs(path, acls); + } + + } + + protected void issueCallback(AtomicReference<IFn> cb) { + IFn callback = cb.getAndSet(null); + if (callback != null) + callback.invoke(); + } + + protected void issueMapCallback(ConcurrentHashMap<String, IFn> callbackConcurrentHashMap, String key) { + IFn callback = callbackConcurrentHashMap.remove(key); + if (callback != null) + callback.invoke(); + } + + @Override + public List<String> assignments(IFn callback) { + if (callback != null) { + assignmentsCallback.set(callback); + } + return stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, callback != null); + } + + @Override + public Assignment assignmentInfo(String stormId, IFn callback) { + if (callback != null) { + assignmentInfoCallback.put(stormId, callback); + } + byte[] serialized = stateStorage.get_data(ClusterUtils.assignmentPath(stormId), callback != null); + return ClusterUtils.maybeDeserialize(serialized, Assignment.class); + } + + @Override + public APersistentMap assignmentInfoWithVersion(String stormId, IFn callback) { + if (callback != null) { + assignmentInfoWithVersionCallback.put(stormId, callback); + } + Assignment assignment = null; + Integer version = 0; + APersistentMap aPersistentMap = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null); + if (aPersistentMap != null) { + assignment = ClusterUtils.maybeDeserialize((byte[]) aPersistentMap.get(RT.keyword(null, "data")), Assignment.class); + version = (Integer) aPersistentMap.get(RT.keyword(null, "version")); + } + APersistentMap map = new PersistentArrayMap(new Object[] { RT.keyword(null, "data"), assignment, RT.keyword(null, "version"), version }); + return map; + } + + @Override + public Integer assignmentVersion(String stormId, IFn callback) throws Exception { + if (callback != null) { + assignmentVersionCallback.put(stormId, callback); + } + return stateStorage.get_version(ClusterUtils.assignmentPath(stormId), callback != null); + } + + // blobstore state + @Override + public List<String> blobstoreInfo(String blobKey) { + String path = ClusterUtils.blobstorePath(blobKey); + stateStorage.sync_path(path); + return stateStorage.get_children(path, false); + } + + @Override + public List nimbuses() { + List<NimbusSummary> nimbusSummaries = new ArrayList<>(); + List<String> nimbusIds = stateStorage.get_children(ClusterUtils.NIMBUSES_SUBTREE, false); + for (String nimbusId : nimbusIds) { + byte[] serialized = stateStorage.get_data(ClusterUtils.nimbusPath(nimbusId), false); + NimbusSummary nimbusSummary = ClusterUtils.maybeDeserialize(serialized, NimbusSummary.class); + nimbusSummaries.add(nimbusSummary); + } + return nimbusSummaries; + } + + @Override + public void addNimbusHost(final String nimbusId, final NimbusSummary nimbusSummary) { + // explicit delete for ephmeral node to ensure this session creates the entry. + stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId)); + stateStorage.add_listener(new ConnectionStateListener() { + @Override + public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { + LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState); + if (connectionState.equals(ConnectionState.RECONNECTED)) { + LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time"); + stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls); + } + + } + }); + + stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls); + } + + @Override + public List<String> activeStorms() { + return stateStorage.get_children(ClusterUtils.STORMS_SUBTREE, false); + } + + @Override + public StormBase stormBase(String stormId, IFn callback) { + if (callback != null) { + stormBaseCallback.put(stormId, callback); + } + return ClusterUtils.maybeDeserialize(stateStorage.get_data(ClusterUtils.stormPath(stormId), callback != null), StormBase.class); + } + + @Override + public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) { + byte[] bytes = stateStorage.get_worker_hb(ClusterUtils.workerbeatPath(stormId, node, port), false); + return ClusterUtils.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class); + + } + + @Override + public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo, boolean isThrift) { + List<ProfileRequest> requests = new ArrayList<>(); + List<ProfileRequest> profileRequests = getTopologyProfileRequests(stormId, isThrift); + for (ProfileRequest profileRequest : profileRequests) { + NodeInfo nodeInfo1 = profileRequest.get_nodeInfo(); + if (nodeInfo1.equals(nodeInfo)) + requests.add(profileRequest); + } + return requests; + } + + @Override + public List<ProfileRequest> getTopologyProfileRequests(String stormId, boolean isThrift) { + List<ProfileRequest> profileRequests = new ArrayList<>(); + String path = ClusterUtils.profilerConfigPath(stormId); + if (stateStorage.node_exists(path, false)) { + List<String> strs = stateStorage.get_children(path, false); + for (String str : strs) { + String childPath = path + ClusterUtils.ZK_SEPERATOR + str; + byte[] raw = stateStorage.get_data(childPath, false); + ProfileRequest request = ClusterUtils.maybeDeserialize(raw, ProfileRequest.class); + if (request != null) + profileRequests.add(request); + } + } + return profileRequests; + } + + @Override + public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest) { + ProfileAction profileAction = profileRequest.get_action(); + String host = profileRequest.get_nodeInfo().get_node(); + Long port = profileRequest.get_nodeInfo().get_port_iterator().next(); + String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction); + stateStorage.set_data(path, Utils.serialize(profileRequest), acls); + } + + @Override + public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest) { + ProfileAction profileAction = profileRequest.get_action(); + String host = profileRequest.get_nodeInfo().get_node(); + Long port = profileRequest.get_nodeInfo().get_port_iterator().next(); + String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction); + stateStorage.delete_node(path); + } + + // need to take executor->node+port in explicitly so that we don't run into a situation where a + // long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats + // with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, + // we avoid situations like that + @Override + public Map<ExecutorInfo, ClusterWorkerHeartbeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) { + Map<ExecutorInfo, ClusterWorkerHeartbeat> executorWhbs = new HashMap<>(); + + Map<NodeInfo, List<List<Long>>> nodePortExecutors = ClusterUtils.reverseMap(executorNodePort); + + for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) { + + String node = entry.getKey().get_node(); + Long port = entry.getKey().get_port_iterator().next(); + ClusterWorkerHeartbeat whb = getWorkerHeartbeat(stormId, node, port); + List<ExecutorInfo> executorInfoList = new ArrayList<>(); + for (List<Long> list : entry.getValue()) { + executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue())); + } + if (whb != null) + executorWhbs.putAll(ClusterUtils.convertExecutorBeats(executorInfoList, whb)); + } + return executorWhbs; + } + + @Override + public List<String> supervisors(IFn callback) { + if (callback != null) { + supervisorsCallback.set(callback); + } + return stateStorage.get_children(ClusterUtils.SUPERVISORS_SUBTREE, callback != null); + } + + @Override + public SupervisorInfo supervisorInfo(String supervisorId) { + String path = ClusterUtils.supervisorPath(supervisorId); + return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), SupervisorInfo.class); + } + + @Override + public void setupHeatbeats(String stormId) { + stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), acls); + } + + @Override + public void teardownHeartbeats(String stormId) { + try { + stateStorage.delete_worker_hb(ClusterUtils.workerbeatStormRoot(stormId)); + } catch (Exception e) { + if (Zookeeper.exceptionCause(KeeperException.class, e)) { + // do nothing + LOG.warn("Could not teardown heartbeats for {}.", stormId); + } else { + throw e; + } + } + } + + @Override + public void teardownTopologyErrors(String stormId) { + try { + stateStorage.delete_node(ClusterUtils.errorStormRoot(stormId)); + } catch (Exception e) { + if (Zookeeper.exceptionCause(KeeperException.class, e)) { + // do nothing + LOG.warn("Could not teardown errors for {}.", stormId); + } else { + throw e; + } + } + } + + @Override + public List<String> heartbeatStorms() { + return stateStorage.get_worker_hb_children(ClusterUtils.WORKERBEATS_SUBTREE, false); + } + + @Override + public List<String> errorTopologies() { + return stateStorage.get_children(ClusterUtils.ERRORS_SUBTREE, false); + } + + @Override + public void setTopologyLogConfig(String stormId, LogConfig logConfig) { + stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), acls); + } + + @Override + public LogConfig topologyLogConfig(String stormId, IFn cb) { + String path = ClusterUtils.logConfigPath(stormId); + return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, cb != null), LogConfig.class); + } + + @Override + public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) { + if (info != null) { + String path = ClusterUtils.workerbeatPath(stormId, node, port); + stateStorage.set_worker_hb(path, Utils.serialize(info), acls); + } + } + + @Override + public void removeWorkerHeartbeat(String stormId, String node, Long port) { + String path = ClusterUtils.workerbeatPath(stormId, node, port); + stateStorage.delete_worker_hb(path); + } + + @Override + public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) { + String path = ClusterUtils.supervisorPath(supervisorId); + stateStorage.set_ephemeral_node(path, Utils.serialize(info), acls); + } + + // if znode exists and to be not on?, delete; if exists and on?, do nothing; + // if not exists and to be on?, create; if not exists and not on?, do nothing; + @Override + public void workerBackpressure(String stormId, String node, Long port, boolean on) { + String path = ClusterUtils.backpressurePath(stormId, node, port); + boolean existed = stateStorage.node_exists(path, false); + if (existed) { + if (on == false) + stateStorage.delete_node(path); + + } else { + if (on == true) { + stateStorage.set_ephemeral_node(path, null, acls); + } + } + } + + // if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise not. + @Override + public boolean topologyBackpressure(String stormId, IFn callback) { + if (callback != null) { + backPressureCallback.put(stormId, callback); + } + String path = ClusterUtils.backpressureStormRoot(stormId); + List<String> childrens = stateStorage.get_children(path, callback != null); + return childrens.size() > 0; + + } + + @Override + public void setupBackpressure(String stormId) { + stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), acls); + } + + @Override + public void removeWorkerBackpressure(String stormId, String node, Long port) { + stateStorage.delete_node(ClusterUtils.backpressurePath(stormId, node, port)); + } + + @Override + public void activateStorm(String stormId, StormBase stormBase) { + String path = ClusterUtils.stormPath(stormId); + stateStorage.set_data(path, Utils.serialize(stormBase), acls); + } + + // To update this function due to APersistentMap/APersistentSet is clojure's structure + @Override + public void updateStorm(String stormId, StormBase newElems) { + + StormBase stormBase = stormBase(stormId, null); + if (stormBase.get_component_executors() != null) { + + Map<String, Integer> newComponentExecutors = new HashMap<>(); + Map<String, Integer> componentExecutors = newElems.get_component_executors(); + // componentExecutors maybe be APersistentMap, which don't support "put" + for (Map.Entry<String, Integer> entry : componentExecutors.entrySet()) { + newComponentExecutors.put(entry.getKey(), entry.getValue()); + } + for (Map.Entry<String, Integer> entry : stormBase.get_component_executors().entrySet()) { + if (!componentExecutors.containsKey(entry.getKey())) { + newComponentExecutors.put(entry.getKey(), entry.getValue()); + } + } + if (newComponentExecutors.size() > 0) + newElems.set_component_executors(newComponentExecutors); + } + + Map<String, DebugOptions> ComponentDebug = new HashMap<>(); + Map<String, DebugOptions> oldComponentDebug = stormBase.get_component_debug(); + + Map<String, DebugOptions> newComponentDebug = newElems.get_component_debug(); + /// oldComponentDebug.keySet()/ newComponentDebug.keySet() maybe be APersistentSet, which don't support addAll + Set<String> debugOptionsKeys = new HashSet<>(); + debugOptionsKeys.addAll(oldComponentDebug.keySet()); + debugOptionsKeys.addAll(newComponentDebug.keySet()); + for (String key : debugOptionsKeys) { + boolean enable = false; + double samplingpct = 0; + if (oldComponentDebug.containsKey(key)) { + enable = oldComponentDebug.get(key).is_enable(); + samplingpct = oldComponentDebug.get(key).get_samplingpct(); + } + if (newComponentDebug.containsKey(key)) { + enable = newComponentDebug.get(key).is_enable(); + samplingpct += newComponentDebug.get(key).get_samplingpct(); + } + DebugOptions debugOptions = new DebugOptions(); + debugOptions.set_enable(enable); + debugOptions.set_samplingpct(samplingpct); + ComponentDebug.put(key, debugOptions); + } + if (ComponentDebug.size() > 0) { + newElems.set_component_debug(ComponentDebug); + } + + if (StringUtils.isBlank(newElems.get_name())) { + newElems.set_name(stormBase.get_name()); + } + if (newElems.get_status() == null) { + newElems.set_status(stormBase.get_status()); + } + if (newElems.get_num_workers() == 0) { + newElems.set_num_workers(stormBase.get_num_workers()); + } + if (newElems.get_launch_time_secs() == 0) { + newElems.set_launch_time_secs(stormBase.get_launch_time_secs()); + } + if (StringUtils.isBlank(newElems.get_owner())) { + newElems.set_owner(stormBase.get_owner()); + } + if (newElems.get_topology_action_options() == null) { + newElems.set_topology_action_options(stormBase.get_topology_action_options()); + } + if (newElems.get_status() == null) { + newElems.set_status(stormBase.get_status()); + } + stateStorage.set_data(ClusterUtils.stormPath(stormId), Utils.serialize(newElems), acls); + } + + @Override + public void removeStormBase(String stormId) { + stateStorage.delete_node(ClusterUtils.stormPath(stormId)); + } + + @Override + public void setAssignment(String stormId, Assignment info) { + stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), acls); + } + + @Override + public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo) { + String path = ClusterUtils.blobstorePath(key) + ClusterUtils.ZK_SEPERATOR + nimbusInfo.toHostPortString() + "-" + versionInfo; + LOG.info("set-path: {}", path); + stateStorage.mkdirs(ClusterUtils.blobstorePath(key), acls); + stateStorage.delete_node_blobstore(ClusterUtils.blobstorePath(key), nimbusInfo.toHostPortString()); + stateStorage.set_ephemeral_node(path, null, acls); + } + + @Override + public List<String> activeKeys() { + return stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, false); + } + + // blobstore state + @Override + public List<String> blobstore(IFn callback) { + if (callback != null) { + blobstoreCallback.set(callback); + } + stateStorage.sync_path(ClusterUtils.BLOBSTORE_SUBTREE); + return stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, callback != null); + + } + + @Override + public void removeStorm(String stormId) { + stateStorage.delete_node(ClusterUtils.assignmentPath(stormId)); + stateStorage.delete_node(ClusterUtils.credentialsPath(stormId)); + stateStorage.delete_node(ClusterUtils.logConfigPath(stormId)); + stateStorage.delete_node(ClusterUtils.profilerConfigPath(stormId)); + removeStormBase(stormId); + } + + @Override + public void removeBlobstoreKey(String blobKey) { + LOG.debug("remove key {}", blobKey); + stateStorage.delete_node(ClusterUtils.blobstorePath(blobKey)); + } + + @Override + public void removeKeyVersion(String blobKey) { + stateStorage.delete_node(ClusterUtils.blobstoreMaxKeySequenceNumberPath(blobKey)); + } + + @Override + public void reportError(String stormId, String componentId, String node, Long port, String error) { + + String path = ClusterUtils.errorPath(stormId, componentId); + String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId); + ErrorInfo errorInfo = new ErrorInfo(error, Time.currentTimeSecs()); + errorInfo.set_host(node); + errorInfo.set_port(port.intValue()); + byte[] serData = Utils.serialize(errorInfo); + stateStorage.mkdirs(path, acls); + stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + "e", serData, acls); + stateStorage.set_data(lastErrorPath, serData, acls); + List<String> childrens = stateStorage.get_children(path, false); + + Collections.sort(childrens, new Comparator<String>() { + public int compare(String arg0, String arg1) { + return Long.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1))); + } + }); + + while (childrens.size() > 10) { + stateStorage.delete_node(path + ClusterUtils.ZK_SEPERATOR + childrens.remove(0)); + } + } + + @Override + public List<ErrorInfo> errors(String stormId, String componentId) { + List<ErrorInfo> errorInfos = new ArrayList<>(); + try { + String path = ClusterUtils.errorPath(stormId, componentId); + if (stateStorage.node_exists(path, false)) { + List<String> childrens = stateStorage.get_children(path, false); + for (String child : childrens) { + String childPath = path + ClusterUtils.ZK_SEPERATOR + child; + ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(childPath, false), ErrorInfo.class); + if (errorInfo != null) + errorInfos.add(errorInfo); + } + } + Collections.sort(errorInfos, new Comparator<ErrorInfo>() { + public int compare(ErrorInfo arg0, ErrorInfo arg1) { + return -Integer.compare(arg0.get_error_time_secs(), arg1.get_error_time_secs()); + } + }); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + + return errorInfos; + } + + @Override + public ErrorInfo lastError(String stormId, String componentId) { + + String path = ClusterUtils.lastErrorPath(stormId, componentId); + if (stateStorage.node_exists(path, false)) { + ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), ErrorInfo.class); + return errorInfo; + } + + return null; + } + + @Override + public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException { + List<ACL> aclList = ClusterUtils.mkTopoOnlyAcls(topoConf); + String path = ClusterUtils.credentialsPath(stormId); + stateStorage.set_data(path, Utils.serialize(creds), aclList); + + } + + @Override + public Credentials credentials(String stormId, IFn callback) { + if (callback != null) { + credentialsCallback.put(stormId, callback); + } + String path = ClusterUtils.credentialsPath(stormId); + return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, callback != null), Credentials.class); + + } + + @Override + public void disconnect() { + stateStorage.unregister(stateId); + if (solo) + stateStorage.close(); + } +}
http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/cluster/StormZkClusterState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormZkClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/StormZkClusterState.java deleted file mode 100644 index 3a4205b..0000000 --- a/storm-core/src/jvm/org/apache/storm/cluster/StormZkClusterState.java +++ /dev/null @@ -1,683 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.cluster; - -import clojure.lang.APersistentMap; -import clojure.lang.IFn; -import clojure.lang.PersistentArrayMap; -import clojure.lang.RT; -import org.apache.commons.lang.StringUtils; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.state.*; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.storm.callback.Callback; -import org.apache.storm.generated.*; -import org.apache.storm.nimbus.NimbusInfo; -import org.apache.storm.utils.Time; -import org.apache.storm.utils.Utils; -import org.apache.storm.zookeeper.Zookeeper; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.ACL; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.UnsupportedEncodingException; -import java.security.NoSuchAlgorithmException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; - -public class StormZkClusterState implements StormClusterState { - - private static Logger LOG = LoggerFactory.getLogger(StormZkClusterState.class); - - private ClusterState clusterState; - - private ConcurrentHashMap<String, IFn> assignmentInfoCallback; - private ConcurrentHashMap<String, IFn> assignmentInfoWithVersionCallback; - private ConcurrentHashMap<String, IFn> assignmentVersionCallback; - private AtomicReference<IFn> supervisorsCallback; - // we want to reigister a topo directory getChildren callback for all workers of this dir - private ConcurrentHashMap<String, IFn> backPressureCallback; - private AtomicReference<IFn> assignmentsCallback; - private ConcurrentHashMap<String, IFn> stormBaseCallback; - private AtomicReference<IFn> blobstoreCallback; - private ConcurrentHashMap<String, IFn> credentialsCallback; - private ConcurrentHashMap<String, IFn> logConfigCallback; - - private List<ACL> acls; - private String stateId; - private boolean solo; - - public StormZkClusterState(Object clusterState, List<ACL> acls, ClusterStateContext context) throws Exception { - - if (clusterState instanceof ClusterState) { - solo = false; - this.clusterState = (ClusterState) clusterState; - } else { - - solo = true; - this.clusterState = new DistributedClusterState((Map) clusterState, (Map) clusterState, acls, context); - } - - assignmentInfoCallback = new ConcurrentHashMap<>(); - assignmentInfoWithVersionCallback = new ConcurrentHashMap<>(); - assignmentVersionCallback = new ConcurrentHashMap<>(); - supervisorsCallback = new AtomicReference<>(); - backPressureCallback = new ConcurrentHashMap<>(); - assignmentsCallback = new AtomicReference<>(); - stormBaseCallback = new ConcurrentHashMap<>(); - credentialsCallback = new ConcurrentHashMap<>(); - logConfigCallback = new ConcurrentHashMap<>(); - blobstoreCallback = new AtomicReference<>(); - - stateId = this.clusterState.register(new Callback() { - - public <T> Object execute(T... args) { - if (args == null) { - LOG.warn("Input args is null"); - return null; - } else if (args.length < 2) { - LOG.warn("Input args is invalid, args length:" + args.length); - return null; - } - String path = (String) args[1]; - - List<String> toks = Zookeeper.tokenizePath(path); - int size = toks.size(); - if (size >= 1) { - String params = null; - String root = toks.get(0); - IFn fn = null; - if (root.equals(Cluster.ASSIGNMENTS_ROOT)) { - if (size == 1) { - // set null and get the old value - issueCallback(assignmentsCallback); - } else { - issueMapCallback(assignmentInfoCallback, toks.get(1)); - issueMapCallback(assignmentVersionCallback, toks.get(1)); - issueMapCallback(assignmentInfoWithVersionCallback, toks.get(1)); - } - - } else if (root.equals(Cluster.SUPERVISORS_ROOT)) { - issueCallback(supervisorsCallback); - } else if (root.equals(Cluster.BLOBSTORE_ROOT)) { - issueCallback(blobstoreCallback); - } else if (root.equals(Cluster.STORMS_ROOT) && size > 1) { - issueMapCallback(stormBaseCallback, toks.get(1)); - } else if (root.equals(Cluster.CREDENTIALS_ROOT) && size > 1) { - issueMapCallback(credentialsCallback, toks.get(1)); - } else if (root.equals(Cluster.LOGCONFIG_ROOT) && size > 1) { - issueMapCallback(logConfigCallback, toks.get(1)); - } else if (root.equals(Cluster.BACKPRESSURE_ROOT) && size > 1) { - issueMapCallback(logConfigCallback, toks.get(1)); - } else { - LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path); - Runtime.getRuntime().exit(30); - } - - } - - return null; - } - - }); - - String[] pathlist = { Cluster.ASSIGNMENTS_SUBTREE, Cluster.STORMS_SUBTREE, Cluster.SUPERVISORS_SUBTREE, Cluster.WORKERBEATS_SUBTREE, - Cluster.ERRORS_SUBTREE, Cluster.BLOBSTORE_SUBTREE, Cluster.NIMBUSES_SUBTREE, Cluster.LOGCONFIG_SUBTREE }; - for (String path : pathlist) { - this.clusterState.mkdirs(path, acls); - } - - } - - protected void issueCallback(AtomicReference<IFn> cb) { - IFn callback = cb.getAndSet(null); - if (callback != null) - callback.invoke(); - } - - protected void issueMapCallback(ConcurrentHashMap<String, IFn> callbackConcurrentHashMap, String key) { - IFn callback = callbackConcurrentHashMap.remove(key); - if (callback != null) - callback.invoke(); - } - - @Override - public List<String> assignments(IFn callback) { - if (callback != null) { - assignmentsCallback.set(callback); - } - return clusterState.get_children(Cluster.ASSIGNMENTS_SUBTREE, callback != null); - } - - @Override - public Assignment assignmentInfo(String stormId, IFn callback) { - if (callback != null) { - assignmentInfoCallback.put(stormId, callback); - } - byte[] serialized = clusterState.get_data(Cluster.assignmentPath(stormId), callback != null); - return Cluster.maybeDeserialize(serialized, Assignment.class); - } - - @Override - public APersistentMap assignmentInfoWithVersion(String stormId, IFn callback) { - if (callback != null) { - assignmentInfoWithVersionCallback.put(stormId, callback); - } - APersistentMap aPersistentMap = clusterState.get_data_with_version(Cluster.assignmentPath(stormId), callback != null); - Assignment assignment = Cluster.maybeDeserialize((byte[]) aPersistentMap.get("data"), Assignment.class); - Integer version = (Integer) aPersistentMap.get("version"); - APersistentMap map = new PersistentArrayMap(new Object[] { RT.keyword(null, "data"), assignment, RT.keyword(null, "version"), version }); - return map; - } - - @Override - public Integer assignmentVersion(String stormId, IFn callback) throws Exception { - if (callback != null) { - assignmentVersionCallback.put(stormId, callback); - } - return clusterState.get_version(Cluster.assignmentPath(stormId), callback != null); - } - - // blobstore state - @Override - public List<String> blobstoreInfo(String blobKey) { - String path = Cluster.blobstorePath(blobKey); - clusterState.sync_path(path); - return clusterState.get_children(path, false); - } - - @Override - public List nimbuses() { - List<NimbusSummary> nimbusSummaries = new ArrayList<>(); - List<String> nimbusIds = clusterState.get_children(Cluster.NIMBUSES_SUBTREE, false); - for (String nimbusId : nimbusIds) { - byte[] serialized = clusterState.get_data(Cluster.nimbusPath(nimbusId), false); - NimbusSummary nimbusSummary = Cluster.maybeDeserialize(serialized, NimbusSummary.class); - nimbusSummaries.add(nimbusSummary); - } - return nimbusSummaries; - } - - @Override - public void addNimbusHost(final String nimbusId, final NimbusSummary nimbusSummary) { - // explicit delete for ephmeral node to ensure this session creates the entry. - clusterState.delete_node(Cluster.nimbusPath(nimbusId)); - clusterState.add_listener(new ConnectionStateListener() { - @Override - public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { - LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState); - if (connectionState.equals(ConnectionState.RECONNECTED)) { - LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time"); - clusterState.set_ephemeral_node(Cluster.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls); - } - - } - }); - - clusterState.set_ephemeral_node(Cluster.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls); - } - - @Override - public List<String> activeStorms() { - return clusterState.get_children(Cluster.STORMS_SUBTREE, false); - } - - @Override - public StormBase stormBase(String stormId, IFn callback) { - if (callback != null) { - stormBaseCallback.put(stormId, callback); - } - return Cluster.maybeDeserialize(clusterState.get_data(Cluster.stormPath(stormId), callback != null), StormBase.class); - } - - @Override - public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) { - byte[] bytes = clusterState.get_worker_hb(Cluster.workerbeatPath(stormId, node, port), false); - if (bytes != null) { - return Cluster.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class); - } - return null; - } - - @Override - public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo, boolean isThrift) { - List<ProfileRequest> requests = new ArrayList<>(); - List<ProfileRequest> profileRequests = getTopologyProfileRequests(stormId, isThrift); - for (ProfileRequest profileRequest : profileRequests) { - NodeInfo nodeInfo1 = profileRequest.get_nodeInfo(); - if (nodeInfo1.equals(nodeInfo)) - requests.add(profileRequest); - } - return requests; - } - - @Override - public List<ProfileRequest> getTopologyProfileRequests(String stormId, boolean isThrift) { - List<ProfileRequest> profileRequests = new ArrayList<>(); - String path = Cluster.profilerConfigPath(stormId); - if (clusterState.node_exists(path, false)) { - List<String> strs = clusterState.get_children(path, false); - for (String str : strs) { - String childPath = path + Cluster.ZK_SEPERATOR + str; - byte[] raw = clusterState.get_data(childPath, false); - ProfileRequest request = Cluster.maybeDeserialize(raw, ProfileRequest.class); - if (request != null) - profileRequests.add(request); - } - } - return profileRequests; - } - - @Override - public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest) { - ProfileAction profileAction = profileRequest.get_action(); - String host = profileRequest.get_nodeInfo().get_node(); - Long port = profileRequest.get_nodeInfo().get_port_iterator().next(); - String path = Cluster.profilerConfigPath(stormId, host, port, profileAction); - clusterState.set_data(path, Utils.serialize(profileRequest), acls); - } - - @Override - public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest) { - ProfileAction profileAction = profileRequest.get_action(); - String host = profileRequest.get_nodeInfo().get_node(); - Long port = profileRequest.get_nodeInfo().get_port_iterator().next(); - String path = Cluster.profilerConfigPath(stormId, host, port, profileAction); - clusterState.delete_node(path); - } - - // need to take executor->node+port in explicitly so that we don't run into a situation where a - // long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats - // with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, - // we avoid situations like that - @Override - public Map<ExecutorInfo, ClusterWorkerHeartbeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) { - Map<ExecutorInfo, ClusterWorkerHeartbeat> executorWhbs = new HashMap<>(); - - LOG.info(executorNodePort.toString()); - Map<NodeInfo, List<List<Long>>> nodePortExecutors = Cluster.reverseMap(executorNodePort); - LOG.info(nodePortExecutors.toString()); - - for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) { - - String node = entry.getKey().get_node(); - Long port = entry.getKey().get_port_iterator().next(); - ClusterWorkerHeartbeat whb = getWorkerHeartbeat(stormId, node, port); - List<ExecutorInfo> executorInfoList = new ArrayList<>(); - for (List<Long> list : entry.getValue()) { - executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue())); - } - executorWhbs.putAll(Cluster.convertExecutorBeats(executorInfoList, whb)); - } - return executorWhbs; - } - - @Override - public List<String> supervisors(IFn callback) { - if (callback != null) { - supervisorsCallback.set(callback); - } - return clusterState.get_children(Cluster.SUPERVISORS_SUBTREE, callback != null); - } - - @Override - public SupervisorInfo supervisorInfo(String supervisorId) { - String path = Cluster.supervisorPath(supervisorId); - return Cluster.maybeDeserialize(clusterState.get_data(path, false), SupervisorInfo.class); - } - - @Override - public void setupHeatbeats(String stormId) { - clusterState.mkdirs(Cluster.workerbeatStormRoot(stormId), acls); - } - - @Override - public void teardownHeartbeats(String stormId) { - try { - clusterState.delete_worker_hb(Cluster.workerbeatStormRoot(stormId)); - } catch (Exception e) { - if (Zookeeper.exceptionCause(KeeperException.class, e)) { - // do nothing - LOG.warn("Could not teardown heartbeats for {}.", stormId); - } else { - throw e; - } - } - } - - @Override - public void teardownTopologyErrors(String stormId) { - try { - clusterState.delete_node(Cluster.errorStormRoot(stormId)); - } catch (Exception e) { - if (Zookeeper.exceptionCause(KeeperException.class, e)) { - // do nothing - LOG.warn("Could not teardown errors for {}.", stormId); - } else { - throw e; - } - } - } - - @Override - public List<String> heartbeatStorms() { - return clusterState.get_worker_hb_children(Cluster.WORKERBEATS_SUBTREE, false); - } - - @Override - public List<String> errorTopologies() { - return clusterState.get_children(Cluster.ERRORS_SUBTREE, false); - } - - @Override - public void setTopologyLogConfig(String stormId, LogConfig logConfig) { - clusterState.set_data(Cluster.logConfigPath(stormId), Utils.serialize(logConfig), acls); - } - - @Override - public LogConfig topologyLogConfig(String stormId, IFn cb) { - String path = Cluster.logConfigPath(stormId); - return Cluster.maybeDeserialize(clusterState.get_data(path, cb != null), LogConfig.class); - } - - @Override - public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) { - if (info != null) { - String path = Cluster.workerbeatPath(stormId, node, port); - clusterState.set_worker_hb(path, Utils.serialize(info), acls); - } - } - - @Override - public void removeWorkerHeartbeat(String stormId, String node, Long port) { - String path = Cluster.workerbeatPath(stormId, node, port); - clusterState.delete_worker_hb(path); - } - - @Override - public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) { - String path = Cluster.supervisorPath(supervisorId); - clusterState.set_ephemeral_node(path, Utils.serialize(info), acls); - } - - // if znode exists and to be not on?, delete; if exists and on?, do nothing; - // if not exists and to be on?, create; if not exists and not on?, do nothing; - @Override - public void workerBackpressure(String stormId, String node, Long port, boolean on) { - String path = Cluster.backpressurePath(stormId, node, port); - boolean existed = clusterState.node_exists(path, false); - if (existed) { - if (on == false) - clusterState.delete_node(path); - - } else { - if (on == true) { - clusterState.set_ephemeral_node(path, null, acls); - } - } - } - - // if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise not. - @Override - public boolean topologyBackpressure(String stormId, IFn callback) { - if (callback != null) { - backPressureCallback.put(stormId, callback); - } - String path = Cluster.backpressureStormRoot(stormId); - List<String> childrens = clusterState.get_children(path, callback != null); - return childrens.size() > 0; - - } - - @Override - public void setupBackpressure(String stormId) { - clusterState.mkdirs(Cluster.backpressureStormRoot(stormId), acls); - } - - @Override - public void removeWorkerBackpressure(String stormId, String node, Long port) { - clusterState.delete_node(Cluster.backpressurePath(stormId, node, port)); - } - - @Override - public void activateStorm(String stormId, StormBase stormBase) { - String path = Cluster.stormPath(stormId); - clusterState.set_data(path, Utils.serialize(stormBase), acls); - } - - // maybe exit some questions for updateStorm - @Override - public void updateStorm(String stormId, StormBase newElems) { - - StormBase stormBase = stormBase(stormId, null); - if (stormBase.get_component_executors() != null) { - - Map<String, Integer> newComponentExecutors = new HashMap<>(); - Map<String, Integer> componentExecutors = newElems.get_component_executors(); - //componentExecutors maybe be APersistentMap, which don't support put - for (Map.Entry<String, Integer> entry : componentExecutors.entrySet()) { - newComponentExecutors.put(entry.getKey(), entry.getValue()); - } - for (Map.Entry<String, Integer> entry : stormBase.get_component_executors().entrySet()) { - if (!componentExecutors.containsKey(entry.getKey())) { - newComponentExecutors.put(entry.getKey(), entry.getValue()); - } - } - if (newComponentExecutors.size() > 0) - newElems.set_component_executors(newComponentExecutors); - } - - Map<String, DebugOptions> ComponentDebug = new HashMap<>(); - Map<String, DebugOptions> oldComponentDebug = stormBase.get_component_debug(); - - Map<String, DebugOptions> newComponentDebug = newElems.get_component_debug(); - - Set<String> debugOptionsKeys = oldComponentDebug.keySet(); - debugOptionsKeys.addAll(newComponentDebug.keySet()); - for (String key : debugOptionsKeys) { - boolean enable = false; - double samplingpct = 0; - if (oldComponentDebug.containsKey(key)) { - enable = oldComponentDebug.get(key).is_enable(); - samplingpct = oldComponentDebug.get(key).get_samplingpct(); - } - if (newComponentDebug.containsKey(key)) { - enable = newComponentDebug.get(key).is_enable(); - samplingpct += newComponentDebug.get(key).get_samplingpct(); - } - DebugOptions debugOptions = new DebugOptions(); - debugOptions.set_enable(enable); - debugOptions.set_samplingpct(samplingpct); - ComponentDebug.put(key, debugOptions); - } - if (ComponentDebug.size() > 0) { - newElems.set_component_debug(ComponentDebug); - } - - - if (StringUtils.isBlank(newElems.get_name())) { - newElems.set_name(stormBase.get_name()); - } - if (newElems.get_status() == null){ - newElems.set_status(stormBase.get_status()); - } - if (newElems.get_num_workers() == 0){ - newElems.set_num_workers(stormBase.get_num_workers()); - } - if (newElems.get_launch_time_secs() == 0) { - newElems.set_launch_time_secs(stormBase.get_launch_time_secs()); - } - if (StringUtils.isBlank(newElems.get_owner())) { - newElems.set_owner(stormBase.get_owner()); - } - if (newElems.get_topology_action_options() == null) { - newElems.set_topology_action_options(stormBase.get_topology_action_options()); - } - if (newElems.get_status() == null) { - newElems.set_status(stormBase.get_status()); - } - clusterState.set_data(Cluster.stormPath(stormId), Utils.serialize(newElems), acls); - } - - @Override - public void removeStormBase(String stormId) { - clusterState.delete_node(Cluster.stormPath(stormId)); - } - - @Override - public void setAssignment(String stormId, Assignment info) { - clusterState.set_data(Cluster.assignmentPath(stormId), Utils.serialize(info), acls); - } - - @Override - public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo) { - String path = Cluster.blobstorePath(key) + Cluster.ZK_SEPERATOR + nimbusInfo.toHostPortString() + "-" + versionInfo; - LOG.info("set-path: {}", path); - clusterState.mkdirs(Cluster.blobstorePath(key), acls); - clusterState.delete_node_blobstore(Cluster.blobstorePath(key), nimbusInfo.toHostPortString()); - clusterState.set_ephemeral_node(path, null, acls); - } - - @Override - public List<String> activeKeys() { - return clusterState.get_children(Cluster.BLOBSTORE_SUBTREE, false); - } - - // blobstore state - @Override - public List<String> blobstore(IFn callback) { - if (callback != null) { - blobstoreCallback.set(callback); - } - clusterState.sync_path(Cluster.BLOBSTORE_SUBTREE); - return clusterState.get_children(Cluster.BLOBSTORE_SUBTREE, callback != null); - - } - - @Override - public void removeStorm(String stormId) { - clusterState.delete_node(Cluster.assignmentPath(stormId)); - clusterState.delete_node(Cluster.credentialsPath(stormId)); - clusterState.delete_node(Cluster.logConfigPath(stormId)); - clusterState.delete_node(Cluster.profilerConfigPath(stormId)); - removeStormBase(stormId); - } - - @Override - public void removeBlobstoreKey(String blobKey) { - LOG.debug("remove key {}", blobKey); - clusterState.delete_node(Cluster.blobstorePath(blobKey)); - } - - @Override - public void removeKeyVersion(String blobKey) { - clusterState.delete_node(Cluster.blobstoreMaxKeySequenceNumberPath(blobKey)); - } - - @Override - public void reportError(String stormId, String componentId, String node, Integer port, String error) { - - try { - String path = Cluster.errorPath(stormId, componentId); - String lastErrorPath = Cluster.lastErrorPath(stormId, componentId); - ErrorInfo errorInfo = new ErrorInfo(error, Time.currentTimeSecs()); - errorInfo.set_host(node); - errorInfo.set_port(port.intValue()); - byte[] serData = Utils.serialize(errorInfo); - clusterState.mkdirs(path, acls); - clusterState.create_sequential(path + Cluster.ZK_SEPERATOR + "e", serData, acls); - clusterState.set_data(lastErrorPath, serData, acls); - List<String> childrens = clusterState.get_children(path, false); - - Collections.sort(childrens); - - while (childrens.size() >= 10) { - clusterState.delete_node(path + Cluster.ZK_SEPERATOR + childrens.remove(0)); - } - } catch (UnsupportedEncodingException e) { - throw Utils.wrapInRuntime(e); - } - } - - @Override - public List<ErrorInfo> errors(String stormId, String componentId) { - List<ErrorInfo> errorInfos = new ArrayList<>(); - try { - String path = Cluster.errorPath(stormId, componentId); - if (clusterState.node_exists(path, false)) { - List<String> childrens = clusterState.get_children(path, false); - for (String child : childrens) { - String childPath = path + Cluster.ZK_SEPERATOR + child; - ErrorInfo errorInfo = Cluster.maybeDeserialize(clusterState.get_data(childPath, false), ErrorInfo.class); - if (errorInfo != null) - errorInfos.add(errorInfo); - } - } - Collections.sort(errorInfos, new Comparator<ErrorInfo>() { - public int compare(ErrorInfo arg0, ErrorInfo arg1) { - return Integer.compare(arg0.get_error_time_secs(), arg1.get_error_time_secs()); - } - }); - } catch (Exception e) { - throw Utils.wrapInRuntime(e); - } - - return errorInfos; - } - - @Override - public ErrorInfo lastError(String stormId, String componentId) { - try { - String path = Cluster.lastErrorPath(stormId, componentId); - if (clusterState.node_exists(path, false)) { - ErrorInfo errorInfo = Cluster.maybeDeserialize(clusterState.get_data(path, false), ErrorInfo.class); - return errorInfo; - } - } catch (UnsupportedEncodingException e) { - throw Utils.wrapInRuntime(e); - } - return null; - } - - @Override - public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException { - List<ACL> aclList = Cluster.mkTopoOnlyAcls(topoConf); - String path = Cluster.credentialsPath(stormId); - clusterState.set_data(path, Utils.serialize(creds), aclList); - - } - - @Override - public Credentials credentials(String stormId, IFn callback) { - if (callback != null) { - credentialsCallback.put(stormId, callback); - } - String path = Cluster.credentialsPath(stormId); - return Cluster.maybeDeserialize(clusterState.get_data(path, callback != null), Credentials.class); - - } - - @Override - public void disconnect() { - clusterState.unregister(stateId); - if (solo) - clusterState.close(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java new file mode 100644 index 0000000..8ac0adc --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.cluster; + +import clojure.lang.APersistentMap; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.*; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.storm.Config; +import org.apache.storm.callback.DefaultWatcherCallBack; +import org.apache.storm.callback.WatcherCallBack; +import org.apache.storm.callback.ZKStateChangedCallback; +import org.apache.storm.utils.Utils; +import org.apache.storm.zookeeper.Zookeeper; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ZKStateStorage implements StateStorage { + + private static Logger LOG = LoggerFactory.getLogger(ZKStateStorage.class); + + private ConcurrentHashMap<String, ZKStateChangedCallback> callbacks = new ConcurrentHashMap<String, ZKStateChangedCallback>(); + private CuratorFramework zkWriter; + private CuratorFramework zkReader; + private AtomicBoolean active; + + private boolean isNimbus; + private Map authConf; + private Map<Object, Object> conf; + + public ZKStateStorage(Map<Object, Object> conf, Map authConf, List<ACL> acls, ClusterStateContext context) throws Exception { + this.conf = conf; + this.authConf = authConf; + if (context.getDaemonType().equals(DaemonType.NIMBUS)) + this.isNimbus = true; + + // just mkdir STORM_ZOOKEEPER_ROOT dir + CuratorFramework zkTemp = mkZk(); + String rootPath = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)); + Zookeeper.mkdirs(zkTemp, rootPath, acls); + zkTemp.close(); + + active = new AtomicBoolean(true); + zkWriter = mkZk(new WatcherCallBack() { + @Override + public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) { + if (active.get()) { + if (!(state.equals(Watcher.Event.KeeperState.SyncConnected))) { + LOG.warn("Received event {} : {}: {} with disconnected Zookeeper.", state, type, path); + } else { + LOG.info("Received event {} : {} : {}", state, type, path); + } + + if (!type.equals(Watcher.Event.EventType.None)) { + for (Map.Entry<String, ZKStateChangedCallback> e : callbacks.entrySet()) { + ZKStateChangedCallback fn = e.getValue(); + fn.changed(type, path); + } + } + } + } + }); + if (isNimbus) { + zkReader = mkZk(new WatcherCallBack() { + @Override + public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) { + if (active.get()) { + if (!(state.equals(Watcher.Event.KeeperState.SyncConnected))) { + LOG.warn("Received event {} : {}: {} with disconnected Zookeeper.", state, type, path); + } else { + LOG.debug("Received event {} : {} : {}", state, type, path); + } + + if (!type.equals(Watcher.Event.EventType.None)) { + for (Map.Entry<String, ZKStateChangedCallback> e : callbacks.entrySet()) { + ZKStateChangedCallback fn = e.getValue(); + fn.changed(type, path); + } + } + } + } + }); + } else { + zkReader = zkWriter; + } + + } + + @SuppressWarnings("unchecked") + private CuratorFramework mkZk() throws IOException { + return Zookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), "", + new DefaultWatcherCallBack(), authConf); + } + + @SuppressWarnings("unchecked") + private CuratorFramework mkZk(WatcherCallBack watcher) throws NumberFormatException, IOException { + return Zookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), + String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcher, authConf); + } + + @Override + public void delete_node_blobstore(String path, String nimbusHostPortInfo) { + + } + + @Override + public String register(ZKStateChangedCallback callback) { + String id = UUID.randomUUID().toString(); + this.callbacks.put(id, callback); + return id; + } + + @Override + public void unregister(String id) { + this.callbacks.remove(id); + } + + @Override + public String create_sequential(String path, byte[] data, List<ACL> acls) { + return Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL_SEQUENTIAL, acls); + } + + @Override + public void mkdirs(String path, List<ACL> acls) { + Zookeeper.mkdirs(zkWriter, path, acls); + } + + @Override + public void delete_node(String path) { + Zookeeper.deleteNode(zkWriter, path); + } + + @Override + public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) { + Zookeeper.mkdirs(zkWriter, parentPath(path), acls); + if (Zookeeper.exists(zkWriter, path, false)) { + try { + Zookeeper.setData(zkWriter, path, data); + } catch (Exception e) { + if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) { + Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls); + } else { + throw Utils.wrapInRuntime(e); + } + } + + } else { + Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls); + } + } + + @Override + public Integer get_version(String path, boolean watch) throws Exception { + Integer ret = Zookeeper.getVersion(zkReader, path, watch); + return ret; + } + + @Override + public boolean node_exists(String path, boolean watch) { + return Zookeeper.existsNode(zkWriter, path, watch); + } + + @Override + public List<String> get_children(String path, boolean watch) { + return Zookeeper.getChildren(zkReader, path, watch); + } + + @Override + public void close() { + this.active.set(false); + zkWriter.close(); + if (isNimbus) { + zkReader.close(); + } + } + + @Override + public void set_data(String path, byte[] data, List<ACL> acls) { + if (Zookeeper.exists(zkWriter, path, false)) { + Zookeeper.setData(zkWriter, path, data); + } else { + Zookeeper.mkdirs(zkWriter, parentPath(path), acls); + Zookeeper.createNode(zkWriter, path, data, CreateMode.PERSISTENT, acls); + } + } + + @Override + public byte[] get_data(String path, boolean watch) { + byte[] ret = null; + + ret = Zookeeper.getData(zkReader, path, watch); + + return ret; + } + + @Override + public APersistentMap get_data_with_version(String path, boolean watch) { + return Zookeeper.getDataWithVersion(zkReader, path, watch); + } + + @Override + public void set_worker_hb(String path, byte[] data, List<ACL> acls) { + set_data(path, data, acls); + } + + @Override + public byte[] get_worker_hb(String path, boolean watch) { + return Zookeeper.getData(zkReader, path, watch); + } + + @Override + public List<String> get_worker_hb_children(String path, boolean watch) { + return get_children(path, watch); + } + + @Override + public void delete_worker_hb(String path) { + delete_node(path); + } + + @Override + public void add_listener(final ConnectionStateListener listener) { + Zookeeper.addListener(zkReader, new ConnectionStateListener() { + @Override + public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { + listener.stateChanged(curatorFramework, connectionState); + } + }); + } + + @Override + public void sync_path(String path) { + Zookeeper.syncPath(zkWriter, path); + } + + // To be remove when finished port Util.clj + public static String parentPath(String path) { + List<String> toks = Zookeeper.tokenizePath(path); + int size = toks.size(); + if (size > 0) { + toks.remove(size - 1); + } + return Zookeeper.toksToPath(toks); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java new file mode 100644 index 0000000..19b04f2 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.cluster; + +import clojure.lang.APersistentMap; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.data.ACL; + +import java.util.List; + +public class ZKStateStorageFactory implements StateStorageFactory{ + + @Override + public StateStorage mkState(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) { + try { + return new ZKStateStorage(config, auth_conf, acls, context); + }catch (Exception e){ + throw Utils.wrapInRuntime(e); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedCluster.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedCluster.java b/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedCluster.java index 5d67a54..2f1440c 100644 --- a/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedCluster.java +++ b/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedCluster.java @@ -16,16 +16,16 @@ */ package org.apache.storm.testing.staticmocking; -import org.apache.storm.cluster.Cluster; +import org.apache.storm.cluster.ClusterUtils; public class MockedCluster implements AutoCloseable { - public MockedCluster(Cluster inst) { - Cluster.setInstance(inst); + public MockedCluster(ClusterUtils inst) { + ClusterUtils.setInstance(inst); } @Override public void close() throws Exception { - Cluster.resetInstance(); + ClusterUtils.resetInstance(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java index f1c7f32..c280515 100644 --- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java +++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java @@ -86,19 +86,23 @@ public class Zookeeper { _instance = INSTANCE; } - public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root) { - return mkClient(conf, servers, port, root, new DefaultWatcherCallBack()); + public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root) { + return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack()); } - public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, Map authConf) { - return mkClient(conf, servers, port, "", new DefaultWatcherCallBack(), authConf); + public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, Map authConf) { + return mkClientImpl(conf, servers, port, "", new DefaultWatcherCallBack(), authConf); } - public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, Map authConf) { - return mkClient(conf, servers, port, root, new DefaultWatcherCallBack(), authConf); + public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, Map authConf) { + return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack(), authConf); } public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) { + return _instance.mkClientImpl(conf, servers, port, root, watcher, authConf); + } + + public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) { CuratorFramework fk; if (authConf != null) { fk = Utils.newCurator(conf, servers, port, root, new ZookeeperAuthInfo(authConf)); @@ -124,8 +128,8 @@ public class Zookeeper { * * @return */ - public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { - return mkClient(conf, servers, port, root, watcher, null); + public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { + return mkClientImpl(conf, servers, port, root, watcher, null); } public static String createNode(CuratorFramework zk, String path, byte[] data, org.apache.zookeeper.CreateMode mode, List<ACL> acls) { @@ -347,7 +351,7 @@ public class Zookeeper { protected ILeaderElector zkLeaderElectorImpl(Map conf) throws UnknownHostException { List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); - CuratorFramework zk = mkClient(conf, servers, port, "", conf); + CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf); String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock"; String id = NimbusInfo.fromConf(conf).toHostPortString(); AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id)); http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/test/clj/integration/org/apache/storm/integration_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj index d374511..d4fab3f 100644 --- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj +++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj @@ -21,7 +21,7 @@ (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout]) (:import [org.apache.storm.tuple Fields]) - (:import [org.apache.storm.cluster StormZkClusterState]) + (:import [org.apache.storm.cluster StormClusterStateImpl]) (:use [org.apache.storm testing config clojure util converter]) (:use [org.apache.storm.daemon common]) (:require [org.apache.storm [thrift :as thrift]])) @@ -576,7 +576,7 @@ (:topology tracked)) _ (advance-cluster-time cluster 11) storm-id (get-storm-id state "test-errors") - errors-count (fn [] (count (clojurify-error (.errors state storm-id "2"))))] + errors-count (fn [] (count (.errors state storm-id "2")))] (is (nil? (clojurify-error (.lastError state storm-id "2")))) http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/test/clj/org/apache/storm/cluster_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj index d0b9882..fa34355 100644 --- a/storm-core/test/clj/org/apache/storm/cluster_test.clj +++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj @@ -23,9 +23,10 @@ (:import [org.mockito.exceptions.base MockitoAssertionError]) (:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory CuratorFrameworkFactory$Builder]) (:import [org.apache.storm.utils Utils TestUtils ZookeeperAuthInfo ConfigUtils]) - (:import [org.apache.storm.cluster ClusterState DistributedClusterState ClusterStateContext StormZkClusterState]) + (:import [org.apache.storm.cluster StateStorage ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils]) (:import [org.apache.storm.zookeeper Zookeeper]) - (:import [org.apache.storm.testing.staticmocking MockedZookeeper]) + (:import [org.apache.storm.callback ZKStateChangedCallback]) + (:import [org.apache.storm.testing.staticmocking MockedZookeeper MockedCluster]) (:require [conjure.core]) (:use [conjure core]) (:use [clojure test]) @@ -33,18 +34,18 @@ (defn mk-config [zk-port] (merge (clojurify-structure (ConfigUtils/readStormConfig)) - {STORM-ZOOKEEPER-PORT zk-port - STORM-ZOOKEEPER-SERVERS ["localhost"]})) + {STORM-ZOOKEEPER-PORT zk-port + STORM-ZOOKEEPER-SERVERS ["localhost"]})) (defn mk-state ([zk-port] (let [conf (mk-config zk-port)] - (DistributedClusterState. conf conf nil (ClusterStateContext.)))) + (ClusterUtils/mkDistributedClusterState conf conf nil (ClusterStateContext.)))) ([zk-port cb] - (let [ret (mk-state zk-port)] - (.register ret cb) - ret ))) + (let [ret (mk-state zk-port)] + (.register ret cb) + ret))) -(defn mk-storm-state [zk-port] (StormZkClusterState. (mk-config zk-port) nil (ClusterStateContext.))) +(defn mk-storm-state [zk-port] (ClusterUtils/mkStormClusterState (mk-config zk-port) nil (ClusterStateContext.))) (deftest test-basics (with-inprocess-zookeeper zk-port @@ -99,24 +100,27 @@ (defn mk-callback-tester [] (let [last (atom nil) - cb (fn [type path] - (reset! last {:type type :path path}))] + cb (reify + ZKStateChangedCallback + (changed + [this type path] + (reset! last {:type type :path path})))] [last cb] )) (defn read-and-reset! [aatom] (let [time (System/currentTimeMillis)] - (loop [] - (if-let [val @aatom] - (do - (reset! aatom nil) - val) - (do - (when (> (- (System/currentTimeMillis) time) 30000) - (throw (RuntimeException. "Waited too long for atom to change state"))) - (Thread/sleep 10) - (recur)) - )))) + (loop [] + (if-let [val @aatom] + (do + (reset! aatom nil) + val) + (do + (when (> (- (System/currentTimeMillis) time) 30000) + (throw (RuntimeException. "Waited too long for atom to change state"))) + (Thread/sleep 10) + (recur)) + )))) (deftest test-callbacks (with-inprocess-zookeeper zk-port @@ -189,35 +193,35 @@ (is (= #{"storm1" "storm3"} (set (.assignments state nil)))) (is (= assignment2 (clojurify-assignment (.assignmentInfo state "storm1" nil)))) (is (= assignment1 (clojurify-assignment (.assignmentInfo state "storm3" nil)))) - - (is (= [] (.active-storms state))) + + (is (= [] (.activeStorms state))) (.activateStorm state "storm1" (thriftify-storm-base base1)) - (is (= ["storm1"] (.active-storms state))) + (is (= ["storm1"] (.activeStorms state))) (is (= base1 (clojurify-storm-base (.stormBase state "storm1" nil)))) (is (= nil (clojurify-storm-base (.stormBase state "storm2" nil)))) (.activateStorm state "storm2" (thriftify-storm-base base2)) (is (= base1 (clojurify-storm-base (.stormBase state "storm1" nil)))) (is (= base2 (clojurify-storm-base (.stormBase state "storm2" nil)))) - (is (= #{"storm1" "storm2"} (set (.active-storms state)))) + (is (= #{"storm1" "storm2"} (set (.activeStorms state)))) (.removeStormBase state "storm1") (is (= base2 (clojurify-storm-base (.stormBase state "storm2" nil)))) - (is (= #{"storm2"} (set (.active-storms state)))) + (is (= #{"storm2"} (set (.activeStorms state)))) (is (nil? (clojurify-crdentials (.credentials state "storm1" nil)))) - (.setCredentials! state "storm1" (thriftify-credentials {"a" "a"}) {}) + (.setCredentials state "storm1" (thriftify-credentials {"a" "a"}) {}) (is (= {"a" "a"} (clojurify-crdentials (.credentials state "storm1" nil)))) (.setCredentials state "storm1" (thriftify-credentials {"b" "b"}) {}) (is (= {"b" "b"} (clojurify-crdentials (.credentials state "storm1" nil)))) - (is (= [] (.blobstoreInfo state nil))) - (.setupBlobstore state "key1" nimbusInfo1 "1") - (is (= ["key1"] (.blobstoreInfo state nil))) + (is (= [] (.blobstoreInfo state ""))) + (.setupBlobstore state "key1" nimbusInfo1 (Integer/parseInt "1")) + (is (= ["key1"] (.blobstoreInfo state ""))) (is (= [(str (.toHostPortString nimbusInfo1) "-1")] (.blobstoreInfo state "key1"))) - (.setupBlobstore state "key1" nimbusInfo2 "1") + (.setupBlobstore state "key1" nimbusInfo2 (Integer/parseInt "1")) (is (= #{(str (.toHostPortString nimbusInfo1) "-1") (str (.toHostPortString nimbusInfo2) "-1")} (set (.blobstoreInfo state "key1")))) (.removeBlobstoreKey state "key1") - (is (= [] (.blobstoreInfo state nil))) + (is (= [] (.blobstoreInfo state ""))) (is (= [] (.nimbuses state))) (.addNimbusHost state "nimbus1:port" nimbusSummary1) @@ -230,11 +234,10 @@ ))) (defn- validate-errors! [state storm-id component errors-list] - (let [errors (clojurify-error (.errors state storm-id component))] - ;;(println errors) + (let [errors (map clojurify-error (.errors state storm-id component))] (is (= (count errors) (count errors-list))) (doseq [[error target] (map vector errors errors-list)] - (when-not (.contains (:error error) target) + (when-not (.contains (:error error) target) (println target " => " (:error error))) (is (.contains (:error error) target)) ))) @@ -257,8 +260,9 @@ (.reportError state "a" "2" (local-hostname) 6700 (stringify-error (IllegalArgumentException.))) (advance-time-secs! 2)) (validate-errors! state "a" "2" (concat (repeat 5 "IllegalArgumentException") - (repeat 5 "RuntimeException") - )) + (repeat 5 "RuntimeException") + )) + (.disconnect state) )))) @@ -285,23 +289,23 @@ (with-inprocess-zookeeper zk-port (let [builder (Mockito/mock CuratorFrameworkFactory$Builder) conf (merge - (mk-config zk-port) - {STORM-ZOOKEEPER-CONNECTION-TIMEOUT 10 - STORM-ZOOKEEPER-SESSION-TIMEOUT 10 - STORM-ZOOKEEPER-RETRY-INTERVAL 5 - STORM-ZOOKEEPER-RETRY-TIMES 2 - STORM-ZOOKEEPER-RETRY-INTERVAL-CEILING 15 - STORM-ZOOKEEPER-AUTH-SCHEME "digest" - STORM-ZOOKEEPER-AUTH-PAYLOAD "storm:thisisapoorpassword"})] + (mk-config zk-port) + {STORM-ZOOKEEPER-CONNECTION-TIMEOUT 10 + STORM-ZOOKEEPER-SESSION-TIMEOUT 10 + STORM-ZOOKEEPER-RETRY-INTERVAL 5 + STORM-ZOOKEEPER-RETRY-TIMES 2 + STORM-ZOOKEEPER-RETRY-INTERVAL-CEILING 15 + STORM-ZOOKEEPER-AUTH-SCHEME "digest" + STORM-ZOOKEEPER-AUTH-PAYLOAD "storm:thisisapoorpassword"})] (. (Mockito/when (.connectString builder (Mockito/anyString))) (thenReturn builder)) (. (Mockito/when (.connectionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder)) (. (Mockito/when (.sessionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder)) (TestUtils/testSetupBuilder builder (str zk-port "/") conf (ZookeeperAuthInfo. conf)) (is (nil? - (try - (. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD)))) - (catch MockitoAssertionError e - e))))))) + (try + (. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD)))) + (catch MockitoAssertionError e + e))))))) (deftest test-storm-state-callbacks ;; TODO finish @@ -309,13 +313,17 @@ (deftest test-cluster-state-default-acls (testing "The default ACLs are empty." - (let [zk-mock (Mockito/mock Zookeeper)] + (let [zk-mock (Mockito/mock Zookeeper) + curator-frameworke (reify CuratorFramework (^void close [this] nil))] ;; No need for when clauses because we just want to return nil (with-open [_ (MockedZookeeper. zk-mock)] - (. (Mockito/when (Mockito/mock Zookeeper)) (thenReturn (reify CuratorFramework (^void close [this] nil)))) - (. (Mockito/when (Mockito/mock DistributedClusterState)) (thenReturn {})) - (. (Mockito/when (Mockito/mock StormZkClusterState)) (thenReturn (reify ClusterState - (register [this callback] nil) - (mkdirs [this path acls] nil)))) - (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil)))))) - + (. (Mockito/when (.mkClientImpl zk-mock (Mockito/anyMap) (Mockito/anyList) (Mockito/any) (Mockito/anyString) (Mockito/any) (Mockito/anyMap))) (thenReturn curator-frameworke)) + (ClusterUtils/mkDistributedClusterState {} nil nil (ClusterStateContext.)) + (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil)))) + (let [distributed-state-storage (reify StateStorage + (register [this callback] nil) + (mkdirs [this path acls] nil)) + cluster-utils (Mockito/mock ClusterUtils)] + (with-open [mocked-cluster (MockedCluster. cluster-utils)] + (. (Mockito/when (.mkDistributedClusterStateImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/eq nil) (Mockito/any))) (thenReturn distributed-state-storage)) + (ClusterUtils/mkStormClusterState {} nil (ClusterStateContext.)))))) \ No newline at end of file
