Repository: storm Updated Branches: refs/heads/master e2563a195 -> 71aa68f09
STORM-3260: Add in support to print some state Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6828ecae Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6828ecae Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6828ecae Branch: refs/heads/master Commit: 6828ecae38c79522c3b7c9ab590d54ececf6d4c3 Parents: 66b8f50 Author: Robert (Bobby) Evans <ev...@yahoo-inc.com> Authored: Wed Oct 17 17:02:40 2018 -0500 Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com> Committed: Wed Oct 17 17:02:40 2018 -0500 ---------------------------------------------------------------------- .../org/apache/storm/blobstore/BlobStore.java | 7 +- .../org/apache/storm/command/AdminCommands.java | 190 ++++++++++++++++++- .../org/apache/storm/daemon/nimbus/Nimbus.java | 3 - 3 files changed, 187 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/6828ecae/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java index 00b632d..cb2928c 100644 --- a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java +++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java @@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory; * Modifying the replication factor only works for HdfsBlobStore as for the LocalFsBlobStore the replication is dependent on the number of * Nimbodes available. */ -public abstract class BlobStore implements Shutdownable { +public abstract class BlobStore implements Shutdownable, AutoCloseable { protected static final String BASE_BLOBS_DIR_NAME = "blobs"; private static final Logger LOG = LoggerFactory.getLogger(BlobStore.class); private static final KeyFilter<String> TO_TOPO_ID = (key) -> ConfigUtils.getIdFromBlobKey(key); @@ -192,6 +192,11 @@ public abstract class BlobStore implements Shutdownable { public abstract int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException, IOException; + @Override + public void close() { + shutdown(); + } + /** * Filters keys based on the KeyFilter passed as the argument. * http://git-wip-us.apache.org/repos/asf/storm/blob/6828ecae/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java index 8fabe6c..564a01c 100644 --- a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java +++ b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java @@ -19,20 +19,30 @@ package org.apache.storm.command; import com.google.common.collect.Sets; +import java.io.File; import java.io.PrintStream; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.commons.io.FileUtils; import org.apache.storm.blobstore.BlobStore; import org.apache.storm.cluster.ClusterStateContext; import org.apache.storm.cluster.ClusterUtils; import org.apache.storm.cluster.DaemonType; import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.nimbus.Nimbus; +import org.apache.storm.generated.Assignment; import org.apache.storm.generated.Credentials; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.SupervisorInfo; import org.apache.storm.nimbus.NimbusInfo; import org.apache.storm.shade.org.apache.zookeeper.ZkCli; +import org.apache.storm.thrift.TBase; +import org.apache.storm.thrift.TFieldIdEnum; +import org.apache.storm.thrift.meta_data.FieldMetaData; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Utils; @@ -61,16 +71,17 @@ public class AdminCommands { private static class RemoveCorruptTopologies implements AdminCommand { @Override public void run(String[] args, Map<String, Object> conf, String command) throws Exception { - BlobStore nimbusBlobStore = ServerUtils.getNimbusBlobStore(conf, NimbusInfo.fromConf(conf), null); - IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf)); + try (BlobStore nimbusBlobStore = ServerUtils.getNimbusBlobStore(conf, NimbusInfo.fromConf(conf), null)) { + IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf)); - Set<String> blobStoreTopologyIds = nimbusBlobStore.filterAndListKeys(key -> ConfigUtils.getIdFromBlobKey(key)); - Set<String> activeTopologyIds = new HashSet<>(stormClusterState.activeStorms()); - Sets.SetView<String> diffTopology = Sets.difference(activeTopologyIds, blobStoreTopologyIds); - LOG.info("active-topology-ids [{}] blob-topology-ids [{}] diff-topology [{}]", - activeTopologyIds, blobStoreTopologyIds, diffTopology); - for (String corruptId : diffTopology) { - stormClusterState.removeStorm(corruptId); + Set<String> blobStoreTopologyIds = nimbusBlobStore.filterAndListKeys(key -> ConfigUtils.getIdFromBlobKey(key)); + Set<String> activeTopologyIds = new HashSet<>(stormClusterState.activeStorms()); + Sets.SetView<String> diffTopology = Sets.difference(activeTopologyIds, blobStoreTopologyIds); + LOG.info("active-topology-ids [{}] blob-topology-ids [{}] diff-topology [{}]", + activeTopologyIds, blobStoreTopologyIds, diffTopology); + for (String corruptId : diffTopology) { + stormClusterState.removeStorm(corruptId); + } } } @@ -104,6 +115,164 @@ public class AdminCommands { } } + /** + * Print value in a human readable format. + * @param value what to print. + * @return a human readable string + */ + public static String prettyPrint(TBase value) { + StringBuilder builder = new StringBuilder(); + prettyPrint(value, 0, builder); + return builder.toString(); + } + + private static void println(StringBuilder out, int depth, Object value) { + for (int i = 0; i < depth; i++) { + out.append("\t"); + } + out.append(value); + out.append("\n"); + } + + private static void prettyPrint(TBase value, int depth, StringBuilder out) { + if (value == null) { + println(out, depth,"null"); + return; + } + println(out, depth, "{"); + prettyPrintFields(value, depth + 1, out); + println(out, depth, "}"); + } + + private static void prettyPrintFields(TBase value, int depth, StringBuilder out) { + for (Map.Entry<? extends TFieldIdEnum, FieldMetaData> entry : FieldMetaData.getStructMetaDataMap(value.getClass()).entrySet()) { + TFieldIdEnum key = entry.getKey(); + if (!value.isSet(key)) { + println(out, depth, key.getFieldName() + ": not set"); + } else { + Object o = value.getFieldValue(key); + prettyPrintKeyValue(key.getFieldName(), o, depth, out); + } + } + } + + private static String keyStr(String key) { + return key == null ? "" : (key + ": "); + } + + private static void prettyPrintKeyValue(String key, Object o, int depth, StringBuilder out) { + //Special cases for storm... + if ("json_conf".equals(key) && o instanceof String) { + try { + o = Utils.parseJson((String)o); + } catch (Exception e) { + LOG.error("Could not parse json_conf as JSON", e); + } + } + if (o instanceof TBase) { + println(out, depth, keyStr(key) + "{"); + prettyPrintFields((TBase) o, depth + 1, out); + println(out, depth, "}"); + } else if (o instanceof Map) { + println(out, depth, keyStr(key) + "{"); + for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) o).entrySet()) { + prettyPrintKeyValue(entry.getKey().toString(), entry.getValue(), depth + 1, out); + } + println(out, depth, "}"); + } else if (o instanceof Collection) { + println(out, depth, keyStr(key) + "["); + for (Object sub: (Collection)o) { + prettyPrintKeyValue(null, sub, depth + 1, out); + } + println(out, depth, "]"); + } else if (o instanceof String) { + println(out, depth, keyStr(key) + "\"" + o + "\""); + } else { + println(out, depth, keyStr(key) + o); + } + } + + private static class PrintTopo implements AdminCommand { + + @Override + public void run(String[] args, Map<String, Object> conf, String command) throws Exception { + for (String arg: args) { + System.out.println(arg + ":"); + StormTopology topo; + File f = new File(arg); + if (f.exists()) { + topo = Utils.deserialize(FileUtils.readFileToByteArray(f), StormTopology.class); + } else { //assume it is a topology id + final String key = ConfigUtils.masterStormCodeKey(arg); + try (BlobStore store = ServerUtils.getNimbusBlobStore(conf, NimbusInfo.fromConf(conf), null)) { + topo = Utils.deserialize(store.readBlob(key, Nimbus.NIMBUS_SUBJECT), StormTopology.class); + } + } + + System.out.println(prettyPrint(topo)); + } + } + + @Override + public void printCliHelp(String command, PrintStream out) { + out.println(command + " [topology_id|file]*:"); + out.println("\tPrint a human readable version of the topology"); + } + } + + private static class PrintSupervisors implements AdminCommand { + @Override + public void run(String[] args, Map<String, Object> conf, String command) throws Exception { + IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf)); + Map<String, SupervisorInfo> infos = stormClusterState.allSupervisorInfo(); + if (args.length <= 0) { + for (Map.Entry<String, SupervisorInfo> entry: infos.entrySet()) { + System.out.println(entry.getKey() + ":"); + System.out.println(prettyPrint(entry.getValue())); + } + } else { + for (String arg : args) { + System.out.println(arg + ":"); + System.out.println(prettyPrint(infos.get(arg))); + } + } + } + + @Override + public void printCliHelp(String command, PrintStream out) { + out.println(command + " [supervisor_id]*:"); + out.println("\tPrint a human readable version of the supervisor info(s). Print all if no args"); + } + } + + private static class PrintAssignments implements AdminCommand { + @Override + public void run(String[] args, Map<String, Object> conf, String command) throws Exception { + IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf)); + stormClusterState.syncRemoteAssignments(null); + stormClusterState.syncRemoteIds(null); + stormClusterState.setAssignmentsBackendSynchronized(); + Map<String, Assignment> infos = stormClusterState.assignmentsInfo(); + if (args.length <= 0) { + for (Map.Entry<String, Assignment> entry: infos.entrySet()) { + System.out.println(entry.getKey() + ":"); + System.out.println(prettyPrint(entry.getValue())); + } + } else { + for (String arg : args) { + System.out.println(arg + ":"); + System.out.println(prettyPrint(infos.get(arg))); + } + } + } + + @Override + public void printCliHelp(String command, PrintStream out) { + out.println(command + " [topology_id]*:"); + out.println("\tPrint a human readable version of the topologies assignment info(s). Print all if no args"); + } + } + private static class Help implements AdminCommand { @Override @@ -135,6 +304,9 @@ public class AdminCommands { COMMANDS.put("zk_cli", new ZkCli()); COMMANDS.put("creds", new CredentialsDebug()); COMMANDS.put("help", new Help()); + COMMANDS.put("print_topo", new PrintTopo()); + COMMANDS.put("print_super", new PrintSupervisors()); + COMMANDS.put("print_assignment", new PrintAssignments()); } static void help(String message, PrintStream out) { http://git-wip-us.apache.org/repos/asf/storm/blob/6828ecae/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index 3fa2f5d..f938b1c 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -1620,9 +1620,6 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { IStormClusterState clusterState = stormClusterState; BlobStore store = blobStore; String jarKey = ConfigUtils.masterStormJarKey(topoId); - String codeKey = ConfigUtils.masterStormCodeKey(topoId); - String confKey = ConfigUtils.masterStormConfKey(topoId); - NimbusInfo hostPortInfo = nimbusHostPortInfo; if (tmpJarLocation != null) { //in local mode there is no jar try (FileInputStream fin = new FileInputStream(tmpJarLocation)) {