Repository: storm
Updated Branches:
  refs/heads/master e2563a195 -> 71aa68f09


STORM-3260: Add in support to print some state


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6828ecae
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6828ecae
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6828ecae

Branch: refs/heads/master
Commit: 6828ecae38c79522c3b7c9ab590d54ececf6d4c3
Parents: 66b8f50
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Oct 17 17:02:40 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Wed Oct 17 17:02:40 2018 -0500

----------------------------------------------------------------------
 .../org/apache/storm/blobstore/BlobStore.java   |   7 +-
 .../org/apache/storm/command/AdminCommands.java | 190 ++++++++++++++++++-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |   3 -
 3 files changed, 187 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6828ecae/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java 
b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
index 00b632d..cb2928c 100644
--- a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
  * Modifying the replication factor only works for HdfsBlobStore as for the 
LocalFsBlobStore the replication is dependent on the number of
  * Nimbodes available.
  */
-public abstract class BlobStore implements Shutdownable {
+public abstract class BlobStore implements Shutdownable, AutoCloseable {
     protected static final String BASE_BLOBS_DIR_NAME = "blobs";
     private static final Logger LOG = LoggerFactory.getLogger(BlobStore.class);
     private static final KeyFilter<String> TO_TOPO_ID = (key) -> 
ConfigUtils.getIdFromBlobKey(key);
@@ -192,6 +192,11 @@ public abstract class BlobStore implements Shutdownable {
     public abstract int updateBlobReplication(String key, int replication, 
Subject who) throws AuthorizationException, KeyNotFoundException,
         IOException;
 
+    @Override
+    public void close() {
+        shutdown();
+    }
+
     /**
      * Filters keys based on the KeyFilter passed as the argument.
      *

http://git-wip-us.apache.org/repos/asf/storm/blob/6828ecae/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java 
b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
index 8fabe6c..564a01c 100644
--- a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
+++ b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java
@@ -19,20 +19,30 @@
 package org.apache.storm.command;
 
 import com.google.common.collect.Sets;
+import java.io.File;
 import java.io.PrintStream;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import org.apache.commons.io.FileUtils;
 import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.cluster.ClusterStateContext;
 import org.apache.storm.cluster.ClusterUtils;
 import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.nimbus.Nimbus;
+import org.apache.storm.generated.Assignment;
 import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SupervisorInfo;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.shade.org.apache.zookeeper.ZkCli;
+import org.apache.storm.thrift.TBase;
+import org.apache.storm.thrift.TFieldIdEnum;
+import org.apache.storm.thrift.meta_data.FieldMetaData;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
@@ -61,16 +71,17 @@ public class AdminCommands {
     private static class RemoveCorruptTopologies implements AdminCommand {
         @Override
         public void run(String[] args, Map<String, Object> conf, String 
command) throws Exception {
-            BlobStore nimbusBlobStore = ServerUtils.getNimbusBlobStore(conf, 
NimbusInfo.fromConf(conf), null);
-            IStormClusterState stormClusterState = 
ClusterUtils.mkStormClusterState(conf, new 
ClusterStateContext(DaemonType.NIMBUS, conf));
+            try (BlobStore nimbusBlobStore = 
ServerUtils.getNimbusBlobStore(conf, NimbusInfo.fromConf(conf), null)) {
+                IStormClusterState stormClusterState = 
ClusterUtils.mkStormClusterState(conf, new 
ClusterStateContext(DaemonType.NIMBUS, conf));
 
-            Set<String> blobStoreTopologyIds = 
nimbusBlobStore.filterAndListKeys(key -> ConfigUtils.getIdFromBlobKey(key));
-            Set<String> activeTopologyIds = new 
HashSet<>(stormClusterState.activeStorms());
-            Sets.SetView<String> diffTopology = 
Sets.difference(activeTopologyIds, blobStoreTopologyIds);
-            LOG.info("active-topology-ids [{}] blob-topology-ids [{}] 
diff-topology [{}]",
-                activeTopologyIds, blobStoreTopologyIds, diffTopology);
-            for (String corruptId : diffTopology) {
-                stormClusterState.removeStorm(corruptId);
+                Set<String> blobStoreTopologyIds = 
nimbusBlobStore.filterAndListKeys(key -> ConfigUtils.getIdFromBlobKey(key));
+                Set<String> activeTopologyIds = new 
HashSet<>(stormClusterState.activeStorms());
+                Sets.SetView<String> diffTopology = 
Sets.difference(activeTopologyIds, blobStoreTopologyIds);
+                LOG.info("active-topology-ids [{}] blob-topology-ids [{}] 
diff-topology [{}]",
+                    activeTopologyIds, blobStoreTopologyIds, diffTopology);
+                for (String corruptId : diffTopology) {
+                    stormClusterState.removeStorm(corruptId);
+                }
             }
         }
 
@@ -104,6 +115,164 @@ public class AdminCommands {
         }
     }
 
+    /**
+     * Print value in a human readable format.
+     * @param value what to print.
+     * @return a human readable string
+     */
+    public static String prettyPrint(TBase value) {
+        StringBuilder builder = new StringBuilder();
+        prettyPrint(value, 0, builder);
+        return builder.toString();
+    }
+
+    private static void println(StringBuilder out, int depth, Object value) {
+        for (int i = 0; i < depth; i++) {
+            out.append("\t");
+        }
+        out.append(value);
+        out.append("\n");
+    }
+
+    private static void prettyPrint(TBase value, int depth, StringBuilder out) 
{
+        if (value == null) {
+            println(out, depth,"null");
+            return;
+        }
+        println(out, depth, "{");
+        prettyPrintFields(value, depth + 1, out);
+        println(out, depth, "}");
+    }
+
+    private static void prettyPrintFields(TBase value, int depth, 
StringBuilder out) {
+        for (Map.Entry<? extends TFieldIdEnum, FieldMetaData> entry : 
FieldMetaData.getStructMetaDataMap(value.getClass()).entrySet()) {
+            TFieldIdEnum key = entry.getKey();
+            if (!value.isSet(key)) {
+                println(out, depth, key.getFieldName() + ": not set");
+            } else {
+                Object o = value.getFieldValue(key);
+                prettyPrintKeyValue(key.getFieldName(), o, depth, out);
+            }
+        }
+    }
+
+    private static String keyStr(String key) {
+        return key == null ? "" : (key + ": ");
+    }
+
+    private static void prettyPrintKeyValue(String key, Object o, int depth, 
StringBuilder out) {
+        //Special cases for storm...
+        if ("json_conf".equals(key) && o instanceof String) {
+            try {
+                o = Utils.parseJson((String)o);
+            } catch (Exception e) {
+                LOG.error("Could not parse json_conf as JSON", e);
+            }
+        }
+        if (o instanceof TBase) {
+            println(out, depth, keyStr(key) + "{");
+            prettyPrintFields((TBase) o, depth + 1, out);
+            println(out, depth, "}");
+        } else if (o instanceof Map) {
+            println(out, depth, keyStr(key) + "{");
+            for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) 
o).entrySet()) {
+                prettyPrintKeyValue(entry.getKey().toString(), 
entry.getValue(), depth + 1, out);
+            }
+            println(out, depth, "}");
+        } else if (o instanceof Collection) {
+            println(out, depth, keyStr(key) + "[");
+            for (Object sub: (Collection)o) {
+                prettyPrintKeyValue(null, sub, depth + 1, out);
+            }
+            println(out, depth, "]");
+        } else if (o instanceof String) {
+            println(out, depth, keyStr(key) + "\"" + o + "\"");
+        } else {
+            println(out, depth, keyStr(key) + o);
+        }
+    }
+
+    private static class PrintTopo implements AdminCommand {
+
+        @Override
+        public void run(String[] args, Map<String, Object> conf, String 
command) throws Exception {
+            for (String arg: args) {
+                System.out.println(arg + ":");
+                StormTopology topo;
+                File f = new File(arg);
+                if (f.exists()) {
+                    topo = Utils.deserialize(FileUtils.readFileToByteArray(f), 
StormTopology.class);
+                } else { //assume it is a topology id
+                    final String key = ConfigUtils.masterStormCodeKey(arg);
+                    try (BlobStore store = 
ServerUtils.getNimbusBlobStore(conf, NimbusInfo.fromConf(conf), null)) {
+                        topo = Utils.deserialize(store.readBlob(key, 
Nimbus.NIMBUS_SUBJECT), StormTopology.class);
+                    }
+                }
+
+                System.out.println(prettyPrint(topo));
+            }
+        }
+
+        @Override
+        public void printCliHelp(String command, PrintStream out) {
+            out.println(command + " [topology_id|file]*:");
+            out.println("\tPrint a human readable version of the topology");
+        }
+    }
+
+    private static class PrintSupervisors implements AdminCommand {
+        @Override
+        public void run(String[] args, Map<String, Object> conf, String 
command) throws Exception {
+            IStormClusterState stormClusterState = 
ClusterUtils.mkStormClusterState(conf, new 
ClusterStateContext(DaemonType.NIMBUS, conf));
+            Map<String, SupervisorInfo> infos = 
stormClusterState.allSupervisorInfo();
+            if (args.length <= 0) {
+                for (Map.Entry<String, SupervisorInfo> entry: 
infos.entrySet()) {
+                    System.out.println(entry.getKey() + ":");
+                    System.out.println(prettyPrint(entry.getValue()));
+                }
+            } else {
+                for (String arg : args) {
+                    System.out.println(arg + ":");
+                    System.out.println(prettyPrint(infos.get(arg)));
+                }
+            }
+        }
+
+        @Override
+        public void printCliHelp(String command, PrintStream out) {
+            out.println(command + " [supervisor_id]*:");
+            out.println("\tPrint a human readable version of the supervisor 
info(s). Print all if no args");
+        }
+    }
+
+    private static class PrintAssignments implements AdminCommand {
+        @Override
+        public void run(String[] args, Map<String, Object> conf, String 
command) throws Exception {
+            IStormClusterState stormClusterState = 
ClusterUtils.mkStormClusterState(conf, new 
ClusterStateContext(DaemonType.NIMBUS, conf));
+            stormClusterState.syncRemoteAssignments(null);
+            stormClusterState.syncRemoteIds(null);
+            stormClusterState.setAssignmentsBackendSynchronized();
+            Map<String, Assignment> infos = 
stormClusterState.assignmentsInfo();
+            if (args.length <= 0) {
+                for (Map.Entry<String, Assignment> entry: infos.entrySet()) {
+                    System.out.println(entry.getKey() + ":");
+                    System.out.println(prettyPrint(entry.getValue()));
+                }
+            } else {
+                for (String arg : args) {
+                    System.out.println(arg + ":");
+                    System.out.println(prettyPrint(infos.get(arg)));
+                }
+            }
+        }
+
+        @Override
+        public void printCliHelp(String command, PrintStream out) {
+            out.println(command + " [topology_id]*:");
+            out.println("\tPrint a human readable version of the topologies 
assignment info(s). Print all if no args");
+        }
+    }
+
     private static class Help implements AdminCommand {
 
         @Override
@@ -135,6 +304,9 @@ public class AdminCommands {
         COMMANDS.put("zk_cli", new ZkCli());
         COMMANDS.put("creds", new CredentialsDebug());
         COMMANDS.put("help", new Help());
+        COMMANDS.put("print_topo", new PrintTopo());
+        COMMANDS.put("print_super", new PrintSupervisors());
+        COMMANDS.put("print_assignment", new PrintAssignments());
     }
 
     static void help(String message, PrintStream out) {

http://git-wip-us.apache.org/repos/asf/storm/blob/6828ecae/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 3fa2f5d..f938b1c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -1620,9 +1620,6 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         IStormClusterState clusterState = stormClusterState;
         BlobStore store = blobStore;
         String jarKey = ConfigUtils.masterStormJarKey(topoId);
-        String codeKey = ConfigUtils.masterStormCodeKey(topoId);
-        String confKey = ConfigUtils.masterStormConfKey(topoId);
-        NimbusInfo hostPortInfo = nimbusHostPortInfo;
         if (tmpJarLocation != null) {
             //in local mode there is no jar
             try (FileInputStream fin = new FileInputStream(tmpJarLocation)) {

Reply via email to