Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2752#discussion_r207554122
  
    --- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java ---
    @@ -0,0 +1,1941 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF)
    + * under one or more contributor license agreements.
    + * See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to you under the Apache License, Version
    + * 2.0 (the "License"); you may not use this file except in compliance 
with the License.
    + * You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an "AS IS" 
BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions
    + * and limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.ui;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.ImmutableMap;
    +import com.google.common.collect.Lists;
    +import java.io.PrintWriter;
    +import java.io.StringWriter;
    +import java.net.URLEncoder;
    +import java.util.ArrayList;
    +import java.util.EnumSet;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +import java.util.stream.Collectors;
    +import javax.servlet.DispatcherType;
    +import javax.servlet.Servlet;
    +import javax.ws.rs.core.Response;
    +import javax.ws.rs.core.SecurityContext;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.Constants;
    +import org.apache.storm.DaemonConfig;
    +import org.apache.storm.generated.Bolt;
    +import org.apache.storm.generated.BoltAggregateStats;
    +import org.apache.storm.generated.ClusterSummary;
    +import org.apache.storm.generated.CommonAggregateStats;
    +import org.apache.storm.generated.ComponentAggregateStats;
    +import org.apache.storm.generated.ComponentPageInfo;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.ExecutorSummary;
    +import org.apache.storm.generated.GetInfoOptions;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.generated.Grouping;
    +import org.apache.storm.generated.KillOptions;
    +import org.apache.storm.generated.LogConfig;
    +import org.apache.storm.generated.LogLevel;
    +import org.apache.storm.generated.LogLevelAction;
    +import org.apache.storm.generated.Nimbus;
    +import org.apache.storm.generated.NimbusSummary;
    +import org.apache.storm.generated.NodeInfo;
    +import org.apache.storm.generated.NumErrorsChoice;
    +import org.apache.storm.generated.OwnerResourceSummary;
    +import org.apache.storm.generated.ProfileAction;
    +import org.apache.storm.generated.ProfileRequest;
    +import org.apache.storm.generated.RebalanceOptions;
    +import org.apache.storm.generated.SpoutSpec;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.generated.SupervisorPageInfo;
    +import org.apache.storm.generated.SupervisorSummary;
    +import org.apache.storm.generated.TopologyHistoryInfo;
    +import org.apache.storm.generated.TopologyInfo;
    +import org.apache.storm.generated.TopologyPageInfo;
    +import org.apache.storm.generated.TopologyStats;
    +import org.apache.storm.generated.TopologySummary;
    +import org.apache.storm.generated.WorkerSummary;
    +import org.apache.storm.logging.filters.AccessLoggingFilter;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.thrift.TException;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.TopologySpoutLag;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.utils.VersionInfo;
    +import org.apache.storm.utils.WebAppUtils;
    +import org.eclipse.jetty.http.HttpStatus;
    +import org.eclipse.jetty.http.HttpVersion;
    +import org.eclipse.jetty.server.HttpConfiguration;
    +import org.eclipse.jetty.server.HttpConnectionFactory;
    +import org.eclipse.jetty.server.SecureRequestCustomizer;
    +import org.eclipse.jetty.server.Server;
    +import org.eclipse.jetty.server.ServerConnector;
    +import org.eclipse.jetty.server.SslConnectionFactory;
    +import org.eclipse.jetty.servlet.FilterHolder;
    +import org.eclipse.jetty.servlet.ServletContextHandler;
    +import org.eclipse.jetty.servlet.ServletHolder;
    +import org.eclipse.jetty.servlets.CrossOriginFilter;
    +import org.eclipse.jetty.util.ssl.SslContextFactory;
    +import org.json.simple.JSONValue;
    +
    +public class UIHelpers {
    +
    +    private static final Object[][] PRETTY_SEC_DIVIDERS = {
    +        new Object[]{ "s", 60 },
    +        new Object[]{ "m", 60 },
    +        new Object[]{ "h", 24 },
    +        new Object[]{ "d", null }
    +    };
    +
    +    private static final Object[][] PRETTY_MS_DIVIDERS = {
    +        new Object[]{ "ms", 1000 },
    +        new Object[]{ "s", 60 },
    +        new Object[]{ "m", 60 },
    +        new Object[]{ "h", 24 },
    +        new Object[]{ "d", null }
    +    };
    +
    +    /**
    +     * Prettify uptime string.
    +     * @param val val.
    +     * @param dividers dividers.
    +     * @return prettified uptime string.
    +     */
    +    public static String prettyUptimeStr(String val, Object[][] dividers) {
    +        int uptime = Integer.parseInt(val);
    +        LinkedList<String> tmp = new LinkedList<>();
    +        for (Object[] divider : dividers) {
    +            if (uptime > 0) {
    +                String state = (String) divider[0];
    +                Integer div = (Integer) divider[1];
    +                if (div != null) {
    +                    tmp.addFirst(uptime % div + state);
    +                    uptime = uptime / div;
    +                } else {
    +                    tmp.addFirst(uptime + state);
    +                }
    +            }
    +        }
    +        return Joiner.on(" ").join(tmp);
    +    }
    +
    +    /**
    +     * Prettify uptime string.
    +     * @param sec uptime in seconds.
    +     * @return prettified uptime string.
    +     */
    +    public static String prettyUptimeSec(String sec) {
    +        return prettyUptimeStr(sec, PRETTY_SEC_DIVIDERS);
    +    }
    +
    +    /**
    +     * prettyUptimeSec.
    +     * @param secs secs
    +     * @return prettyUptimeSec
    +     */
    +    public static String prettyUptimeSec(int secs) {
    +        return prettyUptimeStr(String.valueOf(secs), PRETTY_SEC_DIVIDERS);
    +    }
    +
    +    /**
    +     * prettyUptimeMs.
    +     * @param ms ms
    +     * @return prettyUptimeMs
    +     */
    +    public static String prettyUptimeMs(String ms) {
    +        return prettyUptimeStr(ms, PRETTY_MS_DIVIDERS);
    +    }
    +
    +    /**
    +     * prettyUptimeMs.
    +     * @param ms ms
    +     * @return prettyUptimeMs
    +     */
    +    public static String prettyUptimeMs(int ms) {
    +        return prettyUptimeStr(String.valueOf(ms), PRETTY_MS_DIVIDERS);
    +    }
    +
    +    /**
    +     * url formatter for log links.
    +     * @param fmt string format
    +     * @param args hostname and other arguments.
    +     * @return string formatter
    +     */
    +    public static String urlFormat(String fmt, Object... args) {
    +        String[] argsEncoded = new String[args.length];
    +        for (int i = 0; i < args.length; i++) {
    +            argsEncoded[i] = URLEncoder.encode(String.valueOf(args[i]));
    +        }
    +        return String.format(fmt, argsEncoded);
    +    }
    +
    +    /**
    +     * Prettified executor info.
    +     * @param e from Nimbus call
    +     * @return prettified executor info string
    +     */
    +    public static String prettyExecutorInfo(ExecutorInfo e) {
    +        return "[" + e.get_task_start() + "-" + e.get_task_end() + "]";
    +    }
    +
    +    /**
    +     * Unauthorized user json.
    +     * @param user User id.
    +     * @return Unauthorized user json.
    +     */
    +    public static Map<String, Object> unauthorizedUserJson(String user) {
    +        return ImmutableMap.of(
    +            "error", "No Authorization",
    +            "errorMessage", String.format("User %s is not authorized.", 
user));
    +    }
    +
    +    private static ServerConnector mkSslConnector(Server server, Integer 
port, String ksPath,
    +                                                  String ksPassword, 
String ksType,
    +                                                  String keyPassword, 
String tsPath,
    +                                                  String tsPassword, 
String tsType,
    +                                                  Boolean needClientAuth, 
Boolean wantClientAuth,
    +                                                  Integer 
headerBufferSize) {
    +        SslContextFactory factory = new SslContextFactory();
    +        factory.setExcludeCipherSuites("SSL_RSA_WITH_RC4_128_MD5", 
"SSL_RSA_WITH_RC4_128_SHA");
    +        factory.setExcludeProtocols("SSLv3");
    +        factory.setRenegotiationAllowed(false);
    +        factory.setKeyStorePath(ksPath);
    +        factory.setKeyStoreType(ksType);
    +        factory.setKeyStorePassword(ksPassword);
    +        factory.setKeyManagerPassword(keyPassword);
    +
    +        if (tsPath != null && tsPassword != null && tsType != null) {
    +            factory.setTrustStorePath(tsPath);
    +            factory.setTrustStoreType(tsType);
    +            factory.setTrustStorePassword(tsPassword);
    +        }
    +
    +        if (needClientAuth != null && needClientAuth) {
    +            factory.setNeedClientAuth(true);
    +        } else if (wantClientAuth != null && wantClientAuth) {
    +            factory.setWantClientAuth(true);
    +        }
    +
    +        HttpConfiguration httpsConfig = new HttpConfiguration();
    +        httpsConfig.addCustomizer(new SecureRequestCustomizer());
    +        if (null != headerBufferSize) {
    +            httpsConfig.setRequestHeaderSize(headerBufferSize);
    +        }
    +        ServerConnector sslConnector = new ServerConnector(
    +                server,
    +                new SslConnectionFactory(factory, 
HttpVersion.HTTP_1_1.asString()),
    +                new HttpConnectionFactory(httpsConfig)
    +        );
    +        sslConnector.setPort(port);
    +        return sslConnector;
    +    }
    +
    +    public static void configSsl(Server server, Integer port, String 
ksPath,
    +                                 String ksPassword, String ksType,
    +                                 String keyPassword, String tsPath,
    +                                 String tsPassword, String tsType,
    +                                 Boolean needClientAuth, Boolean 
wantClientAuth) {
    +        configSsl(server, port, ksPath, ksPassword, ksType, keyPassword,
    +                  tsPath, tsPassword, tsType, needClientAuth, 
wantClientAuth, null);
    +    }
    +
    +    /**
    +     * configSsl.
    +     * @param server server
    +     * @param port port
    +     * @param ksPath ksPath
    +     * @param ksPassword ksPassword
    +     * @param ksType ksType
    +     * @param keyPassword keyPassword
    +     * @param tsPath tsPath
    +     * @param tsPassword tsPassword
    +     * @param tsType tsType
    +     * @param needClientAuth needClientAuth
    +     * @param wantClientAuth wantClientAuth
    +     * @param headerBufferSize headerBufferSize
    +     */
    +    public static void configSsl(Server server, Integer port, String 
ksPath,
    +                                 String ksPassword, String ksType,
    +                                 String keyPassword, String tsPath,
    +                                 String tsPassword, String tsType,
    +                                 Boolean needClientAuth,
    +                                 Boolean wantClientAuth, Integer 
headerBufferSize) {
    +        if (port > 0) {
    +            server.addConnector(
    +                    mkSslConnector(
    +                            server, port, ksPath, ksPassword, ksType, 
keyPassword,
    +                            tsPath, tsPassword, tsType,
    +                            needClientAuth, wantClientAuth, 
headerBufferSize
    +                    )
    +            );
    +        }
    +    }
    +
    +    /**
    +     * corsFilterHandle.
    +     * @return corsFilterHandle
    +     */
    +    public static FilterHolder corsFilterHandle() {
    +        FilterHolder filterHolder = new FilterHolder(new 
CrossOriginFilter());
    +        
filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, "*");
    +        
filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, "GET, 
POST, PUT");
    +        filterHolder.setInitParameter(
    +                CrossOriginFilter.ALLOWED_ORIGINS_PARAM,
    +                "X-Requested-With, X-Requested-By, 
Access-Control-Allow-Origin,"
    +                        + " Content-Type, Content-Length, Accept, Origin");
    +        
filterHolder.setInitParameter(CrossOriginFilter.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER,
 "*");
    +        return filterHolder;
    +    }
    +
    +    /**
    +     * mkAccessLoggingFilterHandle.
    +     * @return mkAccessLoggingFilterHandle
    +     */
    +    public static FilterHolder mkAccessLoggingFilterHandle() {
    +
    +        return new FilterHolder(new AccessLoggingFilter());
    +
    +    }
    +
    +    public static void configFilter(Server server, Servlet servlet, 
List<FilterConfiguration> filtersConfs) {
    +        configFilter(server, servlet, filtersConfs, null);
    +    }
    +
    +    /**
    +     * Config filter.
    +     * @param server Server
    +     * @param servlet Servlet
    +     * @param filtersConfs FiltersConfs
    +     * @param params Filter params
    +     */
    +    public static void configFilter(Server server, Servlet servlet,
    +                                    List<FilterConfiguration> filtersConfs,
    +                                    Map<String, String> params) {
    +        if (filtersConfs != null) {
    +            ServletHolder servletHolder = new ServletHolder(servlet);
    +            servletHolder.setInitOrder(0);
    +            if (params != null) {
    +                servletHolder.setInitParameters(params);
    +            }
    +            ServletContextHandler context = new 
ServletContextHandler(server, "/");
    +            context.addServlet(servletHolder, "/");
    +            configFilters(context, filtersConfs);
    +            server.setHandler(context);
    +        }
    +    }
    +
    +    /**
    +     * Config filters.
    +     * @param context Servlet context
    +     * @param filtersConfs filter confs
    +     */
    +    public static void configFilters(ServletContextHandler context,
    +                                     List<FilterConfiguration> 
filtersConfs) {
    +        context.addFilter(corsFilterHandle(), "/*", 
EnumSet.allOf(DispatcherType.class));
    +        for (FilterConfiguration filterConf : filtersConfs) {
    +            String filterName = filterConf.getFilterName();
    +            String filterClass = filterConf.getFilterClass();
    +            Map<String, String> filterParams = 
filterConf.getFilterParams();
    +            if (filterClass != null) {
    +                FilterHolder filterHolder = new FilterHolder();
    +                filterHolder.setClassName(filterClass);
    +                if (filterName != null) {
    +                    filterHolder.setName(filterName);
    +                } else {
    +                    filterHolder.setName(filterClass);
    +                }
    +                if (filterParams != null) {
    +                    filterHolder.setInitParameters(filterParams);
    +                } else {
    +                    filterHolder.setInitParameters(new HashMap<>());
    +                }
    +                context.addFilter(filterHolder, "/*", 
EnumSet.allOf(DispatcherType.class));
    +            }
    +        }
    +        context.addFilter(mkAccessLoggingFilterHandle(), "/*", 
EnumSet.allOf(DispatcherType.class));
    +    }
    +
    +    /**
    +     * Construct a Jetty Server instance.
    +     */
    +    public static Server jettyCreateServer(Integer port, String host, 
Integer httpsPort) {
    +        return jettyCreateServer(port, host, httpsPort, null);
    +    }
    +
    +    /**
    +     * Construct a Jetty Server instance.
    +     */
    +    public static Server jettyCreateServer(Integer port, String host,
    +                                           Integer httpsPort, Integer 
headerBufferSize) {
    +        Server server = new Server();
    +
    +        if (httpsPort == null || httpsPort <= 0) {
    +            HttpConfiguration httpConfig = new HttpConfiguration();
    +            httpConfig.setSendDateHeader(true);
    +            if (null != headerBufferSize) {
    +                httpConfig.setRequestHeaderSize(headerBufferSize);
    +            }
    +            ServerConnector httpConnector = new ServerConnector(
    +                    server, new HttpConnectionFactory(httpConfig)
    +            );
    +            httpConnector.setPort(ObjectReader.getInt(port, 80));
    +            httpConnector.setIdleTimeout(200000);
    +            httpConnector.setHost(host);
    +            server.addConnector(httpConnector);
    +        }
    +
    +        return server;
    +    }
    +
    +    /**
    +     * Modified version of run-jetty
    +     * Assumes configurator sets handler.
    +     */
    +    public static void stormRunJetty(Integer port, String host,
    +                                     Integer httpsPort, Integer 
headerBufferSize,
    +                                     IConfigurator configurator) throws 
Exception {
    +        Server s = jettyCreateServer(port, host, httpsPort, 
headerBufferSize);
    +        if (configurator != null) {
    +            configurator.execute(s);
    +        }
    +        s.start();
    +    }
    +
    +    public static void stormRunJetty(Integer port, Integer 
headerBufferSize,
    +                                     IConfigurator configurator) throws 
Exception {
    +        stormRunJetty(port, null, null, headerBufferSize, configurator);
    +    }
    +
    +    /**
    +     * wrapJsonInCallback.
    +     * @param callback callbackParameterName
    +     * @param response response
    +     * @return wrapJsonInCallback
    +     */
    +    public static String wrapJsonInCallback(String callback, String 
response) {
    +        return callback + "(" + response + ");";
    +    }
    +
    +    /**
    +     * getJsonResponseHeaders.
    +     * @param callback callbackParameterName
    +     * @param headers headers
    +     * @return getJsonResponseHeaders
    +     */
    +    public static Map getJsonResponseHeaders(String callback, Map headers) 
{
    +        Map<String, String> headersResult = new HashMap<>();
    +        headersResult.put("Cache-Control", "no-cache, no-store");
    +        headersResult.put("Access-Control-Allow-Origin", "*");
    +        headersResult.put("Access-Control-Allow-Headers",
    +                          "Content-Type, Access-Control-Allow-Headers, "
    +                                  + "Access-Controler-Allow-Origin, "
    +                                  + "X-Requested-By, X-Csrf-Token, "
    +                                  + "Authorization, X-Requested-With");
    +        if (callback != null) {
    +            headersResult.put("Content-Type", 
"application/javascript;charset=utf-8");
    +        } else {
    +            headersResult.put("Content-Type", 
"application/json;charset=utf-8");
    +        }
    +        if (headers != null) {
    +            headersResult.putAll(headers);
    +        }
    +        return headersResult;
    +    }
    +
    +    public static String getJsonResponseBody(Object data, String callback, 
boolean needSerialize) {
    +        String serializedData = needSerialize ? 
JSONValue.toJSONString(data) : (String) data;
    +        return callback != null ? wrapJsonInCallback(callback, 
serializedData) : serializedData;
    +    }
    +
    +    /**
    +     * Converts exception into json map.
    +     * @param ex Exception to be converted.
    +     * @param statusCode Status code to be returned.
    +     * @return Map to be converted into json.
    +     */
    +    public static Map exceptionToJson(Exception ex, int statusCode) {
    +        StringWriter sw = new StringWriter();
    +        ex.printStackTrace(new PrintWriter(sw));
    +        return ImmutableMap.of(
    +                "error", statusCode
    +                        + " "
    +                        + HttpStatus.getMessage(statusCode),
    +                "errorMessage", sw.toString());
    +    }
    +
    +    public static Response makeStandardResponse(Object data, String 
callback) {
    +        return makeStandardResponse(data, callback, true, 
Response.Status.OK);
    +    }
    +
    +    public static Response makeStandardResponse(Object data, String 
callback, Response.Status status) {
    +        return makeStandardResponse(data, callback, true, status);
    +    }
    +
    +    /**
    +     * makeStandardResponse.
    +     * @param data data
    +     * @param callback callbackParameterName
    +     * @param needsSerialization needsSerialization
    +     * @return makeStandardResponse
    +     */
    +    public static Response makeStandardResponse(
    +            Object data, String callback, boolean needsSerialization, 
Response.Status status) {
    +        String body = getJsonResponseBody(data, callback, 
needsSerialization);
    +        Response.ResponseBuilder responseBuilder = 
Response.status(status).entity(body);
    +        Map<String, String> headers = getJsonResponseHeaders(callback, 
null);
    +        for (Map.Entry<String, String> headerEntry: headers.entrySet()) {
    +            responseBuilder.header(headerEntry.getKey(), 
headerEntry.getValue());
    +        }
    +        return responseBuilder.build();
    +    }
    +
    +    /**
    +     * Converts thrift call result into map fit for UI/api.
    +     * @param clusterSummary Obtained from Nimbus.
    +     * @param user User Making request
    +     * @param conf Storm Conf
    +     * @return Cluster Summary for display on UI/monitoring purposes via 
API
    +     */
    +    public static Map<String, Object> getClusterSummary(ClusterSummary 
clusterSummary, String user,
    +                                                        Map<String, 
Object> conf) {
    +        Map<String, Object> result = new HashMap();
    +        List<SupervisorSummary> supervisorSummaries = 
clusterSummary.get_supervisors();
    +        List<TopologySummary> topologySummaries = 
clusterSummary.get_topologies();
    +
    +        int usedSlots =
    +                supervisorSummaries.stream().mapToInt(
    +                SupervisorSummary::get_num_used_workers).sum();
    +        int totalSlots =
    +                supervisorSummaries.stream().mapToInt(
    +                        SupervisorSummary::get_num_workers).sum();
    +
    +        int totalTasks =
    +                topologySummaries.stream().mapToInt(
    +                        TopologySummary::get_num_tasks).sum();
    +        int totalExecutors =
    +                topologySummaries.stream().mapToInt(
    +                        TopologySummary::get_num_executors).sum();
    +
    +        double supervisorTotalMemory =
    +                supervisorSummaries.stream().mapToDouble(x -> 
x.get_total_resources().getOrDefault(
    +                        Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME,
    +                        
x.get_total_resources().get(Config.SUPERVISOR_MEMORY_CAPACITY_MB)
    +                        )
    +                ).sum();
    +
    +        double supervisorTotalCpu =
    +                supervisorSummaries.stream().mapToDouble(x -> 
x.get_total_resources().getOrDefault(
    +                        Constants.COMMON_CPU_RESOURCE_NAME,
    +                        
x.get_total_resources().get(Config.SUPERVISOR_CPU_CAPACITY)
    +                        )
    +                ).sum();
    +
    +        double supervisorUsedMemory =
    +                
supervisorSummaries.stream().mapToDouble(SupervisorSummary::get_used_mem).sum();
    +        double supervisorUsedCpu =
    +                
supervisorSummaries.stream().mapToDouble(SupervisorSummary::get_used_cpu).sum();
    +        double supervisorFragementedCpu =
    +                supervisorSummaries.stream().mapToDouble(
    +                        SupervisorSummary::get_fragmented_cpu).sum();
    +        double supervisorFragmentedMem =
    +                supervisorSummaries.stream().mapToDouble(
    +                        SupervisorSummary::get_fragmented_mem).sum();
    +
    +
    +        result.put("user", user);
    +        result.put("stormVersion", VersionInfo.getVersion());
    +        result.put("supervisors", supervisorSummaries.size());
    +        result.put("topologies", clusterSummary.get_topologies_size());
    +        result.put("slotsUsed", usedSlots);
    +        result.put("slotsTotal", totalSlots);
    +        result.put("slotsFree", totalSlots - usedSlots);
    +        result.put("tasksTotal", totalTasks);
    +        result.put("totalExecutors", totalExecutors);
    +
    +        result.put("totalMem", supervisorTotalMemory);
    +        result.put("totalCpu", supervisorTotalCpu);
    +        result.put("availMem", supervisorTotalMemory - 
supervisorUsedMemory);
    +        result.put("availCpu", supervisorTotalCpu - supervisorUsedCpu);
    +        result.put("fragmentedMem", supervisorFragmentedMem);
    +        result.put("fragmentedCpu", supervisorFragementedCpu);
    +        result.put("schedulerDisplayResource",
    +                conf.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
    +        result.put("memAssignedPercentUtil", supervisorTotalMemory > 0
    +                ? String.valueOf((supervisorTotalMemory - 
supervisorUsedMemory)  * 100.0
    +                / supervisorTotalMemory) : "0.0");
    +        result.put("cpuAssignedPercentUtil", supervisorTotalCpu > 0
    +                ? String.valueOf((supervisorTotalCpu - supervisorUsedCpu) 
* 100.0
    +                / supervisorTotalCpu) : "0.0");
    +        result.put("bugtracker-url", 
conf.get(DaemonConfig.UI_PROJECT_BUGTRACKER_URL));
    +        result.put("central-log-url", 
conf.get(DaemonConfig.UI_CENTRAL_LOGGING_URL));
    +        return result;
    +    }
    +
    +    /**
    +     * Prettify OwnerResourceSummary.
    +     * @param ownerResourceSummary ownerResourceSummary
    +     * @return Map of prettified OwnerResourceSummary.
    +     */
    +    public static Map<String, Object> unpackOwnerResourceSummary(
    +            OwnerResourceSummary ownerResourceSummary) {
    +
    +        Double memoryGuarantee = Double.valueOf(-1);
    +        if (ownerResourceSummary.is_set_memory_guarantee()) {
    +            memoryGuarantee = ownerResourceSummary.get_memory_guarantee();
    +        }
    +
    +        Double cpuGuaranteee = Double.valueOf(-1);
    +        if (ownerResourceSummary.is_set_cpu_guarantee()) {
    +            cpuGuaranteee = ownerResourceSummary.get_cpu_guarantee();
    +        }
    +
    +        int isolatedNodeGuarantee = -1;
    +        if (ownerResourceSummary.is_set_isolated_node_guarantee()) {
    +            isolatedNodeGuarantee = 
ownerResourceSummary.get_isolated_node_guarantee();
    +        }
    +
    +        Double memoryGuaranteeRemaining = Double.valueOf(-1);
    +        if (ownerResourceSummary.is_set_memory_guarantee_remaining()) {
    +            memoryGuaranteeRemaining = 
ownerResourceSummary.get_memory_guarantee_remaining();
    +        }
    +
    +        Double cpuGuaranteeRemaining = Double.valueOf(-1);
    +        if (ownerResourceSummary.is_set_cpu_guarantee_remaining()) {
    +            cpuGuaranteeRemaining = 
ownerResourceSummary.get_cpu_guarantee_remaining();
    +        }
    +
    +        Map<String, Object> result = new HashMap();
    +        result.put("owner", ownerResourceSummary.get_owner());
    +        result.put("totalTopologies", 
ownerResourceSummary.get_total_topologies());
    +        result.put("totalExecutors", 
ownerResourceSummary.get_total_executors());
    +        result.put("totalWorkers", 
ownerResourceSummary.get_total_workers());
    +        result.put("totalTasks", ownerResourceSummary.get_total_tasks());
    +        result.put("totalMemoryUsage", 
ownerResourceSummary.get_memory_usage());
    +        result.put("totalCpuUsage", ownerResourceSummary.get_cpu_usage());
    +
    +        result.put("memoryGuarantee", memoryGuarantee != -1 ? 
memoryGuarantee : "N/A");
    +        result.put("cpuGuarantee", cpuGuaranteee != -1 ? cpuGuaranteee : 
"N/A");
    +        result.put("isolatedNodes", isolatedNodeGuarantee);
    +
    +        result.put("memoryGuaranteeRemaining",
    +                memoryGuaranteeRemaining != -1 ? memoryGuaranteeRemaining 
: "N/A");
    +        result.put("cpuGuaranteeRemaining",
    +                cpuGuaranteeRemaining != -1 ? cpuGuaranteeRemaining : 
"N/A");
    +        result.put("totalReqOnHeapMem", 
ownerResourceSummary.get_requested_on_heap_memory());
    +
    +        result.put("totalReqOffHeapMem", 
ownerResourceSummary.get_requested_off_heap_memory());
    +
    +        result.put("totalReqMem", 
ownerResourceSummary.get_requested_total_memory());
    +        result.put("totalReqCpu", 
ownerResourceSummary.get_requested_cpu());
    +        result.put("totalAssignedOnHeapMem",
    +                ownerResourceSummary.get_assigned_on_heap_memory()
    +        );
    +        result.put("totalAssignedOffHeapMem", 
ownerResourceSummary.get_assigned_off_heap_memory());
    +
    +        return result;
    +    }
    +
    +    /**
    +     * Get prettified ownerResourceSummaries.
    +     * @param ownerResourceSummaries ownerResourceSummaries from thrift 
call
    +     * @param conf Storm conf
    +     * @return map to be converted to json.
    +     */
    +    public static Map<String, Object> getOwnerResourceSummaries(
    +            List<OwnerResourceSummary> ownerResourceSummaries, Map<String, 
Object> conf) {
    +        Map<String, Object> result = new HashMap();
    +
    +        result.put("schedulerDisplayResource", 
conf.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
    +
    +        List<Map<String, Object>> ownerSummaries = new ArrayList();
    +
    +        for (OwnerResourceSummary ownerResourceSummary : 
ownerResourceSummaries) {
    +            
ownerSummaries.add(unpackOwnerResourceSummary(ownerResourceSummary));
    +        }
    +        result.put("owners", ownerSummaries);
    +        return result;
    +    }
    +
    +    /**
    +     * getTopologyMap.
    +     * @param topologySummary topologySummary
    +     * @return getTopologyMap
    +     */
    +    public static Map<String, Object> getTopologyMap(TopologySummary 
topologySummary) {
    +        Map<String, Object> result = new HashMap();
    +        result.put("id", topologySummary.get_id());
    +        result.put("encodedId", 
URLEncoder.encode(topologySummary.get_id()));
    +        result.put("owner", topologySummary.get_owner());
    +        result.put("name", topologySummary.get_name());
    +        result.put("status", topologySummary.get_status());
    +        result.put("uptime", 
UIHelpers.prettyUptimeSec(topologySummary.get_uptime_secs()));
    +        result.put("uptimeSeconds", topologySummary.get_uptime_secs());
    +        result.put("tasksTotal", topologySummary.get_num_tasks());
    +        result.put("workersTotal", topologySummary.get_num_workers());
    +        result.put("executorsTotal", topologySummary.get_num_executors());
    +        result.put("replicationCount", 
topologySummary.get_replication_count());
    +        result.put("schedulerInfo", topologySummary.get_sched_status());
    +        result.put("requestedMemOnHeap", 
topologySummary.get_requested_memonheap());
    +        result.put("requestedMemOffHeap", 
topologySummary.get_requested_memoffheap());
    +        result.put("requestedTotalMem",
    +                topologySummary.get_requested_memoffheap()
    +                        + topologySummary.get_assigned_memonheap());
    +        result.put("requestedCpu", topologySummary.get_requested_cpu());
    +        result.put("assignedMemOnHeap", 
topologySummary.get_assigned_memonheap());
    +        result.put("assignedMemOffHeap", 
topologySummary.get_assigned_memoffheap());
    +        result.put("assignedTotalMem",
    +                topologySummary.get_assigned_memoffheap()
    +                        + topologySummary.get_assigned_memonheap());
    +        result.put("assignedCpu", topologySummary.get_assigned_cpu());
    +        result.put("topologyVersion", 
topologySummary.get_topology_version());
    +        result.put("stormVersion", topologySummary.get_storm_version());
    +        return result;
    +    }
    +
    +    /**
    +     * Get a specific owner resource summary.
    +     * @param ownerResourceSummaries Result from thrift call.
    +     * @param client client
    +     * @param id Owner id.
    +     * @param config Storm conf.
    +     * @return prettified owner resource summary.
    +     */
    +    public static Map<String, Object> getOwnerResourceSummary(
    +            List<OwnerResourceSummary> ownerResourceSummaries,
    +            Nimbus.Iface client, String id, Map<String, Object> config) 
throws TException {
    +        Map<String, Object> result = new HashMap();
    +        result.put("schedulerDisplayResource", 
config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
    +
    +        if (ownerResourceSummaries.isEmpty()) {
    +            return unpackOwnerResourceSummary(new 
OwnerResourceSummary(id));
    +        }
    +
    +        List<TopologySummary> topologies = null;
    +        topologies = client.getClusterInfo().get_topologies();
    +        List<Map> topologySummaries = getTopologiesMap(id, topologies);
    +
    +        
result.putAll(unpackOwnerResourceSummary(ownerResourceSummaries.get(0)));
    +        result.put("topologies", topologySummaries);
    +
    +        return result;
    +    }
    +
    +    /**
    +     * getTopologiesMap.
    +     * @param id id
    +     * @param topologies topologies
    +     * @return getTopologiesMap
    +     */
    +    private static List<Map> getTopologiesMap(String id, 
List<TopologySummary> topologies) {
    +        List<Map> topologySummaries = new ArrayList();
    +
    +        for (TopologySummary topologySummary : topologies) {
    +            if (id == null || topologySummary.get_owner().equals(id)) {
    +                topologySummaries.add(getTopologyMap(topologySummary));
    +            }
    +        }
    +        return topologySummaries;
    +    }
    +
    +    /**
    +     * getLogviewerLink.
    +     * @param host host
    +     * @param fname fname
    +     * @param config config
    +     * @param port port
    +     * @return getLogviewerLink.
    +     */
    +    public static String getLogviewerLink(String host, String fname,
    +                                          Map<String, Object> config, int 
port) {
    +        if (isSecureLogviewer(config)) {
    +            return UIHelpers.urlFormat("https://%s:%s/api/v1/log?file=%s";,
    +                    host, config.get(DaemonConfig.LOGVIEWER_HTTPS_PORT), 
fname);
    +        } else {
    +            return UIHelpers.urlFormat("http://%s:%s/api/v1/log?file=%s";,
    +                    host, config.get(DaemonConfig.LOGVIEWER_PORT), fname);
    +        }
    +    }
    +
    +    /**
    +     * Get log link to nimbus log.
    +     * @param host nimbus host name
    +     * @param config storm config
    +     * @return log link.
    +     */
    +    public static String getNimbusLogLink(String host, Map<String, Object> 
config) {
    +        if (isSecureLogviewer(config)) {
    +            return 
UIHelpers.urlFormat("https://%s:%s/api/v1/daemonlog?file=nimbus.log";,
    +                    host, config.get(DaemonConfig.LOGVIEWER_HTTPS_PORT));
    +        }
    +        return 
UIHelpers.urlFormat("http://%s:%s/api/v1/daemonlog?file=nimbus.log";,
    +                host, config.get(DaemonConfig.LOGVIEWER_PORT));
    +    }
    +
    +    /**
    +     * Get log link to supervisor log.
    +     * @param host supervisor host name
    +     * @param config storm config
    +     * @return log link.
    +     */
    +    public static String getSupervisorLogLink(String host, Map<String, 
Object> config) {
    +        if (isSecureLogviewer(config)) {
    +            return 
UIHelpers.urlFormat("https://%s:%s/api/v1/daemonlog?file=supervisor.log";,
    +                    host, config.get(DaemonConfig.LOGVIEWER_HTTPS_PORT));
    +        }
    +        return 
UIHelpers.urlFormat("http://%s:%s/api/v1/daemonlog?file=supervisor.log";,
    +                host, config.get(DaemonConfig.LOGVIEWER_PORT));
    +    }
    +
    +    /**
    +     * Get log link to supervisor log.
    +     * @param host supervisor host name
    +     * @param config storm config
    +     * @return log link.
    +     */
    +    public static String getWorkerLogLink(String host, int port,
    +                                          Map<String, Object> config, 
String topologyId) {
    +        return getLogviewerLink(host,
    +                WebAppUtils.logsFilename(
    +                        topologyId, String.valueOf(port)),
    +                config, port
    +        );
    +    }
    +
    +    /**
    +     * Get supervisor info in a map.
    +     * @param supervisorSummary from nimbus call.
    +     * @param config Storm config.
    +     * @return prettified supervisor info map.
    +     */
    +    public static Map<String, Object> getPrettifiedSupervisorMap(
    +            SupervisorSummary supervisorSummary,
    +            Map<String, Object> config) {
    +        Map<String, Object> result = new HashMap();
    +
    +        result.put("id", supervisorSummary.get_supervisor_id());
    +        result.put("host", supervisorSummary.get_host());
    +        result.put("uptime", 
UIHelpers.prettyUptimeSec(supervisorSummary.get_uptime_secs()));
    +        result.put("uptimeSeconds", supervisorSummary.get_uptime_secs());
    +        result.put("slotsTotal", supervisorSummary.get_num_workers());
    +        result.put("slotsUsed", supervisorSummary.get_num_used_workers());
    +        result.put("slotsFree",
    +                Integer.max(supervisorSummary.get_num_workers()
    +                        - supervisorSummary.get_num_used_workers(), 0));
    +        Map<String, Double> totalResources = 
supervisorSummary.get_total_resources();
    +        Double totalMemory = totalResources.getOrDefault(
    +                Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME,
    +                totalResources.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB)
    +        );
    +        result.put("totalMem",
    +                totalMemory
    +        );
    +        Double totalCpu = totalResources.getOrDefault(
    +                Constants.COMMON_CPU_RESOURCE_NAME,
    +                totalResources.get(Config.SUPERVISOR_CPU_CAPACITY)
    +        );
    +        result.put("totalCpu",
    +                totalCpu);
    +        result.put("usedMem", supervisorSummary.get_used_mem());
    +        result.put("usedCpu", supervisorSummary.get_used_cpu());
    +        result.put(
    +                "logLink",
    +                getSupervisorLogLink(supervisorSummary.get_host(), config)
    +        );
    +        result.put("availMem", totalMemory - 
supervisorSummary.get_used_mem());
    +        result.put("availCpu", totalCpu - 
supervisorSummary.get_used_cpu());
    +        result.put("version", supervisorSummary.get_version());
    +        return result;
    +    }
    +
    +    /**
    +     * Get topology history.
    +     * @param topologyHistory from Nimbus call.
    +     * @return map ready to be returned.
    +     */
    +    public static Map<String, Object> 
getTopologyHistoryInfo(TopologyHistoryInfo topologyHistory) {
    +        Map<String, Object> result = new HashMap();
    +        result.put("topo-history", topologyHistory.get_topo_ids());
    +        return result;
    +    }
    +
    +    /**
    +     * Check if logviewer is secure.
    +     * @param config Storm config.
    +     * @return true if logiviwer is secure.
    +     */
    +    public static boolean isSecureLogviewer(Map<String, Object> config) {
    +        if (config.containsKey(DaemonConfig.LOGVIEWER_HTTPS_PORT)) {
    +            int logviewerPort = (int) 
config.get(DaemonConfig.LOGVIEWER_HTTPS_PORT);
    +            if (logviewerPort >= 0) {
    +                return true;
    +            }
    +        }
    +        return false;
    +    }
    +
    +    /**
    +     * Get logviewer port depending on whether the logviewer is secure or 
not.
    +     * @param config Storm config.
    +     * @return appropriate port.
    +     */
    +    public static int getLogviewerPort(Map<String, Object> config) {
    +        if (isSecureLogviewer(config)) {
    +            return (int) config.get(DaemonConfig.LOGVIEWER_HTTPS_PORT);
    +        }
    +        return (int) config.get(DaemonConfig.LOGVIEWER_PORT);
    +    }
    +
    +    /**
    +     * getWorkerSummaries.
    +     * @param supervisorPageInfo supervisorPageInfo
    +     * @param config config
    +     * @return getWorkerSummaries
    +     */
    +    public static List<Map> getWorkerSummaries(SupervisorPageInfo 
supervisorPageInfo,
    +                                               Map<String, Object> config) 
{
    +        List<Map> workerSummaries = new ArrayList();
    +        if (supervisorPageInfo.is_set_worker_summaries()) {
    +            for (WorkerSummary workerSummary : 
supervisorPageInfo.get_worker_summaries()) {
    +                workerSummaries.add(getWorkerSummaryMap(workerSummary, 
config));
    +            }
    +        }
    +        return workerSummaries;
    +    }
    +
    +    /**
    +     * getWorkerSummaryMap.
    +     * @param workerSummary workerSummary
    +     * @param config config
    +     * @return getWorkerSummaryMap
    +     */
    +    private static Map getWorkerSummaryMap(WorkerSummary workerSummary, 
Map<String, Object> config) {
    +        Map<String, Object> result = new HashMap();
    +        result.put("supervisorId", workerSummary.get_supervisor_id());
    +        result.put("host", workerSummary.get_host());
    +        result.put("port", workerSummary.get_port());
    +        result.put("topologyId", workerSummary.get_topology_id());
    +        result.put("topologyName", workerSummary.get_topology_name());
    +        result.put("executorsTotal", workerSummary.get_num_executors());
    +        result.put("assignedMemOnHeap", 
workerSummary.get_assigned_memonheap());
    +        result.put("assignedMemOffHeap", 
workerSummary.get_assigned_memoffheap());
    +        result.put("assignedCpu", workerSummary.get_assigned_cpu());
    +        result.put("componentNumTasks", 
workerSummary.get_component_to_num_tasks());
    +        result.put("uptime", 
UIHelpers.prettyUptimeSec(workerSummary.get_uptime_secs()));
    +        result.put("uptimeSeconds", workerSummary.get_uptime_secs());
    +        result.put("workerLogLink", 
getWorkerLogLink(workerSummary.get_host(),
    +                workerSummary.get_port(), config, 
workerSummary.get_topology_id()));
    +        return result;
    +    }
    +
    +    /**
    +     * getSupervisorSummary.
    +     * @param supervisors supervisor summary list.
    +     * @param securityContext security context injected.
    +     * @param config Storm config.
    +     * @return Prettified JSON.
    +     */
    +    public static Map<String, Object> getSupervisorSummary(
    +            List<SupervisorSummary> supervisors,
    +            SecurityContext securityContext, Map<String, Object> config) {
    +        Map<String, Object> result = new HashMap();
    +        addLogviewerInfo(config, result);
    +        List<Map> supervisorMaps = getSupervisorsMap(supervisors, config);
    +        result.put("supervisors", supervisorMaps);
    +        result.put("schedulerDisplayResource",
    +                config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE)
    +        );
    +
    +        return result;
    +    }
    +
    +    /**
    +     * getSupervisorsMap.
    +     * @param supervisors supervisors
    +     * @param config config
    +     * @return getSupervisorsMap
    +     */
    +    private static List<Map> getSupervisorsMap(List<SupervisorSummary> 
supervisors, Map<String, Object> config) {
    +        List<Map> supervisorMaps = new ArrayList();
    +        for (SupervisorSummary supervisorSummary : supervisors) {
    +            
supervisorMaps.add(getPrettifiedSupervisorMap(supervisorSummary, config));
    +        }
    +        return supervisorMaps;
    +    }
    +
    +    /**
    +     * addLogviewerInfo.
    +     * @param config config
    +     * @param result result
    +     */
    +    private static void addLogviewerInfo(Map<String, Object> config, 
Map<String, Object> result) {
    +        result.put("logviewerPort", getLogviewerPort(config));
    +        String logviewerScheme = "http";
    +        if (isSecureLogviewer(config)) {
    +            logviewerScheme = "https";
    +        }
    +        result.put("logviewerScheme", logviewerScheme);
    +    }
    +
    +    /**
    +     * getSupervisorPageInfo.
    +     * @param supervisorPageInfo supervisorPageInfo
    +     * @param config config
    +     * @return getSupervisorPageInfo
    +     */
    +    public static Map<String, Object> getSupervisorPageInfo(
    +            SupervisorPageInfo supervisorPageInfo, Map<String,Object> 
config) {
    +        Map<String, Object> result = new HashMap();
    +        result.put("workers", getWorkerSummaries(supervisorPageInfo, 
config));
    +        result.put("schedulerDisplayResource", 
config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
    +        List<Map> supervisorMaps = 
getSupervisorsMap(supervisorPageInfo.get_supervisor_summaries(), config);
    +        result.put("supervisors", supervisorMaps);
    +        addLogviewerInfo(config, result);
    +        return result;
    +    }
    +
    +    /**
    +     * getAllTopologiesSummary.
    +     * @param topologies topologies
    +     * @param config config
    +     * @return getAllTopologiesSummary
    +     */
    +    public static Map<String, Object> getAllTopologiesSummary(
    +            List<TopologySummary> topologies, Map<String,Object> config) {
    +        Map<String, Object> result = new HashMap();
    +        result.put("topologies", getTopologiesMap(null, topologies));
    +        result.put("schedulerDisplayResource", 
config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
    +        return result;
    +    }
    +
    +    /**
    +     * getWindowHint.
    +     * @param window window
    +     * @return getWindowHint
    +     */
    +    public static String getWindowHint(String window) {
    +        if (window.equals(":all-time")) {
    +            return "All time";
    +        }
    +        return UIHelpers.prettyUptimeSec(window);
    +    }
    +
    +    /**
    +     * getStatDisplayMap.
    +     * @param rawDisplayMap rawDisplayMap
    +     * @return getStatDisplayMap
    +     */
    +    public static Map<String, Double> getStatDisplayMap(Map<String, 
Double> rawDisplayMap) {
    +        Map<String, Double> result = new HashMap();
    +        for (Map.Entry<String, Double> entry : rawDisplayMap.entrySet()) {
    +            result.put(getWindowHint(entry.getKey()), entry.getValue());
    +        }
    +
    +        return result;
    +    }
    +
    +    /**
    +     * getTopologySummary.
    +     * @param topologyPageInfo topologyPageInfo
    +     * @param window window
    +     * @param config config
    +     * @param remoteUser remoteUser
    +     * @return getTopologySummary
    +     */
    +    public static Map<String, Object> getTopologySummary(TopologyPageInfo 
topologyPageInfo,
    +                                                         String window, 
Map<String, Object> config, String remoteUser) {
    +        Map<String, Object> result = new HashMap();
    +        Map<String, Object> topologyConf = (Map<String, Object>) 
JSONValue.parse(topologyPageInfo.get_topology_conf());
    +        long messageTimeout = (long) 
topologyConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
    +        Map<String, Object> unpackedTopologyPageInfo =
    +                unpackTopologyInfo(topologyPageInfo, window, config);
    +        result.putAll(unpackedTopologyPageInfo);
    +        result.put("user", remoteUser);
    +        result.put("window", window);
    +        result.put("windowHint", getWindowHint(window));
    +        result.put("msgTimeout", messageTimeout);
    +        result.put("configuration", topologyConf);
    +        result.put("visualizationTable", new ArrayList());
    +        result.put("schedulerDisplayResource", 
config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
    +        return result;
    +    }
    +
    +    private static Map<String, Long> 
getStatDisplayMapLong(Map<String,Long> windowToTransferred) {
    +        Map<String, Long> result = new HashMap();
    +        for (Map.Entry<String, Long> entry : 
windowToTransferred.entrySet()) {
    +            result.put(entry.getKey(), entry.getValue());
    +        }
    +        return result;
    +    }
    +
    +    private static Map<String, Object> 
getCommonAggStatsMap(CommonAggregateStats commonAggregateStats) {
    +        Map<String, Object> result = new HashMap();
    +        result.put("executors", commonAggregateStats.get_num_executors());
    +        result.put("tasks", commonAggregateStats.get_num_tasks());
    +        result.put("emitted", commonAggregateStats.get_emitted());
    +        result.put("transferred", commonAggregateStats.get_transferred());
    +        result.put("acked", commonAggregateStats.get_acked());
    +        result.put("failed", commonAggregateStats.get_failed());
    +        result.put(
    +                "requestedMemOnHeap",
    +                
commonAggregateStats.get_resources_map().get(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME)
    +        );
    +        result.put(
    +                "requestedMemOffHeap",
    +                
commonAggregateStats.get_resources_map().get(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME));
    +        result.put(
    +                "requestedCpu" ,
    +                
commonAggregateStats.get_resources_map().get(Constants.COMMON_CPU_RESOURCE_NAME));
    +        return result;
    +    }
    +
    +    private static Map<String, Object> getSpoutAggStatsMap(
    +            ComponentAggregateStats componentAggregateStats, String 
spoutId) {
    +        Map<String, Object> result = new HashMap();
    +        
result.putAll(getCommonAggStatsMap(componentAggregateStats.get_common_stats()));
    +        result.put("spoutId", spoutId);
    +        result.put("encodedSpoutId", URLEncoder.encode(spoutId));
    +        result.put("completeLatency",
    +                
componentAggregateStats.get_specific_stats().get_spout().get_complete_latency_ms());
    +        return result;
    +    }
    +
    +    private static Map<String, Object> getBoltAggStatsMap(
    +            ComponentAggregateStats componentAggregateStats, String 
boltId) {
    +        Map<String, Object> result = new HashMap();
    +        
result.putAll(getCommonAggStatsMap(componentAggregateStats.get_common_stats()));
    +        result.put("boltId", boltId);
    +        result.put("encodedBoltId", URLEncoder.encode(boltId));
    +        BoltAggregateStats boltAggregateStats = 
componentAggregateStats.get_specific_stats().get_bolt();
    +        result.put("capacity", 
StatsUtil.floatStr(boltAggregateStats.get_capacity()));
    +        result.put("executeLatency", 
StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms()));
    +        result.put("executed", boltAggregateStats.get_executed());
    +        result.put("processLatency", 
StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms()));
    +        return result;
    +    }
    +
    +    private static List<Map> getTopologyStatsMap(TopologyStats 
topologyStats) {
    +        List<Map> result = new ArrayList();
    +
    +        Map<String, Long> emittedStatDisplayMap = 
getStatDisplayMapLong(topologyStats.get_window_to_emitted());
    +        Map<String, Long> transferred = 
getStatDisplayMapLong(topologyStats.get_window_to_transferred());
    +        Map<String, Double> completeLatency = 
getStatDisplayMap(topologyStats.get_window_to_complete_latencies_ms());
    +        Map<String, Long> acked = 
getStatDisplayMapLong(topologyStats.get_window_to_acked());
    +        Map<String, Long> failed = 
getStatDisplayMapLong(topologyStats.get_window_to_failed());
    +        for (String window : emittedStatDisplayMap.keySet()) {
    +            Map<String, Object> temp = new HashMap();
    +            temp.put("windowPretty", getWindowHint(window));
    +            temp.put("window", window);
    +            temp.put("emitted", emittedStatDisplayMap.get(window));
    +            temp.put("transferred", transferred.get(window));
    +            temp.put("completeLatency", completeLatency.get(window));
    +            temp.put("acked", acked.get(window));
    +            temp.put("failed", failed.get(window));
    +
    +
    +            result.add(temp);
    +        }
    +        return result;
    +    }
    +
    +    private static Map<String,Object> unpackTopologyInfo(TopologyPageInfo 
topologyPageInfo, String window, Map<String,Object> config) {
    +        Map<String, Object> result = new HashMap();
    +        result.put("id", topologyPageInfo.get_id());
    +        result.put("encodedId", 
URLEncoder.encode(topologyPageInfo.get_id()));
    +        result.put("owner", topologyPageInfo.get_owner());
    +        result.put("name", topologyPageInfo.get_name());
    +        result.put("status", topologyPageInfo.get_status());
    +        result.put("uptime", 
UIHelpers.prettyUptimeSec(topologyPageInfo.get_uptime_secs()));
    +        result.put("uptimeSeconds", topologyPageInfo.get_uptime_secs());
    +        result.put("tasksTotal", topologyPageInfo.get_num_tasks());
    +        result.put("workersTotal", topologyPageInfo.get_num_workers());
    +        result.put("executorsTotal", topologyPageInfo.get_num_executors());
    +        result.put("schedulerInfo", topologyPageInfo.get_sched_status());
    +        result.put("requestedMemOnHeap", 
topologyPageInfo.get_requested_memonheap());
    +        result.put("requestedMemOffHeap", 
topologyPageInfo.get_requested_memoffheap());
    +        result.put("requestedCpu", topologyPageInfo.get_requested_cpu());
    +        result.put("requestedTotalMem",
    +                topologyPageInfo.get_requested_memonheap() + 
topologyPageInfo.get_requested_memoffheap()
    +        );
    +        result.put("assignedMemOnHeap", 
topologyPageInfo.get_assigned_memonheap());
    +        result.put("assignedMemOffHeap", 
topologyPageInfo.get_assigned_memoffheap());
    +        result.put("assignedTotalMem", 
topologyPageInfo.get_assigned_memonheap()
    +                + topologyPageInfo.get_assigned_memoffheap());
    +        result.put("assignedCpu", topologyPageInfo.get_assigned_cpu());
    +        result.put("requestedRegularOnHeapMem", 
topologyPageInfo.get_requested_regular_on_heap_memory());
    +        result.put("requestedSharedOnHeapMem", 
topologyPageInfo.get_requested_shared_on_heap_memory());
    +        result.put("requestedRegularOffHeapMem", 
topologyPageInfo.get_requested_regular_off_heap_memory());
    +        result.put("requestedSharedOffHeapMem", 
topologyPageInfo.get_requested_shared_off_heap_memory());
    +        result.put("assignedRegularOnHeapMem", 
topologyPageInfo.get_assigned_regular_on_heap_memory());
    +        result.put("assignedSharedOnHeapMem", 
topologyPageInfo.get_assigned_shared_on_heap_memory());
    +        result.put("assignedRegularOffHeapMem", 
topologyPageInfo.get_assigned_regular_off_heap_memory());
    +        result.put("assignedSharedOffHeapMem", 
topologyPageInfo.get_assigned_shared_off_heap_memory());
    +        result.put("topologyStats", 
getTopologyStatsMap(topologyPageInfo.get_topology_stats()));
    +        List<Map> workerSummaries = new ArrayList();
    +        if (topologyPageInfo.is_set_workers()) {
    +            for (WorkerSummary workerSummary : 
topologyPageInfo.get_workers()) {
    +                workerSummaries.add(getWorkerSummaryMap(workerSummary, 
config));
    +            }
    +        }
    +        result.put("workers", workerSummaries);
    +
    +        Map<String, ComponentAggregateStats> spouts = 
topologyPageInfo.get_id_to_spout_agg_stats();
    +        List<Map> spoutStats = new ArrayList();
    +
    +        for (Map.Entry<String, ComponentAggregateStats> spoutEntry : 
spouts.entrySet()) {
    +            spoutStats.add(getSpoutAggStatsMap(spoutEntry.getValue(), 
spoutEntry.getKey()));
    +        }
    +        result.put("spouts", spoutStats);
    +
    +        Map<String, ComponentAggregateStats> bolts = 
topologyPageInfo.get_id_to_bolt_agg_stats();
    +        List<Map> boltStats = new ArrayList();
    +
    +        for (Map.Entry<String, ComponentAggregateStats> boltEntry : 
bolts.entrySet()) {
    +            boltStats.add(getBoltAggStatsMap(boltEntry.getValue(), 
boltEntry.getKey()));
    +        }
    +        result.put("bolts", boltStats);
    +
    +
    +        result.put("configuration", topologyPageInfo.get_topology_conf());
    +        boolean debuggingEnabled = false;
    +        if (topologyPageInfo.is_set_debug_options()) {
    +            debuggingEnabled = 
topologyPageInfo.get_debug_options().is_enable();
    +        }
    +        result.put("debug", debuggingEnabled);
    +        double samplingPct = 10;
    +        if (debuggingEnabled) {
    +            samplingPct = 
topologyPageInfo.get_debug_options().get_samplingpct();
    +        }
    +        result.put("samplingPct", samplingPct);
    +        result.put("replicationCount", 
topologyPageInfo.get_replication_count());
    +        result.put("topologyVersion", 
topologyPageInfo.get_topology_version());
    +        result.put("stormVersion", topologyPageInfo.get_storm_version());
    +        return result;
    +    }
    +
    +    /**
    +     * getTopologyWorkers.
    +     * @param topologyInfo topologyInfo
    +     * @param config config
    +     * @return getTopologyWorkers.
    +     */
    +    public static Map<String, Object> getTopologyWorkers(TopologyInfo 
topologyInfo, Map config) {
    +        List<Map> executorSummaries = new ArrayList();
    +        for (ExecutorSummary executorSummary : 
topologyInfo.get_executors()) {
    +            Map<String, Object> executorSummaryMap = new HashMap();
    +            executorSummaryMap.put("host", executorSummary.get_host());
    +            executorSummaryMap.put("port", executorSummary.get_port());
    +            executorSummaries.add(executorSummaryMap);
    +        }
    +        HashSet hashSet = new HashSet();
    +        hashSet.addAll(executorSummaries);
    +        executorSummaries.clear();
    +        executorSummaries.addAll(hashSet);
    +        Map<String, Object> result = new HashMap();
    +        result.put("hostPortList", executorSummaries);
    +        addLogviewerInfo(config, result);
    +        return result;
    +    }
    +
    +
    +    /**
    +     * getTopologyLag.
    +     * @param userTopology userTopology
    +     * @param config config
    +     * @return getTopologyLag.
    +     */
    +    public static Map<String, Map<String, Object>> 
getTopologyLag(StormTopology userTopology, Map<String,Object> config) {
    +        return TopologySpoutLag.lag(userTopology, config);
    +    }
    +
    +    /**
    +     * getBoltExecutors.
    +     * @param executorSummaries executorSummaries
    +     * @param stormTopology stormTopology
    +     * @param sys sys
    +     * @return getBoltExecutors.
    +     */
    +    public static Map<String, List<ExecutorSummary>> 
getBoltExecutors(List<ExecutorSummary> executorSummaries,
    +                                                                
StormTopology stormTopology, boolean sys) {
    +        Map<String, List<ExecutorSummary>> result = new HashMap();
    +        for (ExecutorSummary executorSummary : executorSummaries) {
    +            if (StatsUtil.componentType(stormTopology, 
executorSummary.get_component_id()).equals("bolt")
    +                    && (sys || 
!Utils.isSystemId(executorSummary.get_component_id()))) {
    +                List<ExecutorSummary> executorSummaryList = 
result.getOrDefault(executorSummary.get_component_id(), new ArrayList());
    +                executorSummaryList.add(executorSummary);
    +                result.put(executorSummary.get_component_id(), 
executorSummaryList);
    +            }
    +        }
    +        return result;
    +    }
    +
    +    /**
    +     * getSpoutExecutors.
    +     * @param executorSummaries executorSummaries
    +     * @param stormTopology stormTopology
    +     * @return getSpoutExecutors.
    +     */
    +    public static Map<String, List<ExecutorSummary>> 
getSpoutExecutors(List<ExecutorSummary> executorSummaries,
    +                                                                       
StormTopology stormTopology) {
    +        Map<String, List<ExecutorSummary>> result = new HashMap();
    +        for (ExecutorSummary executorSummary : executorSummaries) {
    +            if (StatsUtil.componentType(stormTopology, 
executorSummary.get_component_id()).equals("spout")) {
    +                List<ExecutorSummary> executorSummaryList = 
result.getOrDefault(executorSummary.get_component_id(), new ArrayList());
    +                executorSummaryList.add(executorSummary);
    +                result.put(executorSummary.get_component_id(), 
executorSummaryList);
    +            }
    +        }
    +        return result;
    +    }
    +
    +    /**
    +     * sanitizeStreamName.
    +     * @param streamName streamName
    +     * @return sanitizeStreamName
    +     */
    +    public static String sanitizeStreamName(String streamName) {
    +        Pattern pattern = Pattern.compile("(?![A-Za-z_\\-:\\.]).");
    +        Pattern pattern2 = Pattern.compile("^[A-Za-z]");
    +        Matcher matcher = pattern2.matcher(streamName);
    +        Matcher matcher2 = pattern.matcher("\\s" + streamName);
    +        if (matcher.find()) {
    +            matcher2 = pattern.matcher(streamName);
    +        }
    +        return matcher2.replaceAll("_");
    +    }
    +
    +    /**
    +     * sanitizeTransferredStats.
    +     * @param stats stats
    +     * @return sanitizeTransferredStats
    +     */
    +    public static  Map<String, Map<String,Long>> 
sanitizeTransferredStats(Map<String, Map<String,Long>> stats) {
    +        Map<String, Map<String,Long>> result = new HashMap();
    +        for (Map.Entry<String, Map<String,Long>> entry : stats.entrySet()) 
{
    +            Map<String,Long> temp = new HashMap();
    +            for (Map.Entry<String,Long> innerEntry : 
entry.getValue().entrySet()) {
    +                temp.put(sanitizeStreamName(innerEntry.getKey()), 
innerEntry.getValue());
    +            }
    +            result.put(entry.getKey(), temp);
    +        }
    +        return result;
    +    }
    +
    +    /**
    +     * getStatMapFromExecutorSummary.
    +     * @param executorSummary executorSummary
    +     * @return getStatMapFromExecutorSummary
    +     */
    +    public static Map<String, Object> 
getStatMapFromExecutorSummary(ExecutorSummary executorSummary) {
    +        Map<String, Object> result = new HashMap();
    +        result.put("host", executorSummary.get_host());
    +        result.put("port", executorSummary.get_port());
    +        result.put("uptime_secs", executorSummary.get_uptime_secs());
    +        result.put("transferred", null);
    +        if (executorSummary.is_set_stats()) {
    +            result.put("transferred", 
sanitizeTransferredStats(executorSummary.get_stats().get_transferred()));
    +        }
    +        return result;
    +    }
    +
    +    /**
    +     * getInputMap.
    +     * @param entryInput entryInput
    +     * @return getInputMap
    +     */
    +    public static Map<String, Object> 
getInputMap(Map.Entry<GlobalStreamId,Grouping> entryInput) {
    +        Map<String, Object> result = new HashMap();
    +        result.put("component", entryInput.getKey().get_componentId());
    +        result.put("stream", entryInput.getKey().get_streamId());
    +        result.put("sani-stream", 
sanitizeStreamName(entryInput.getKey().get_streamId()));
    +        result.put("grouping", entryInput.getValue());
    +        return result;
    +    }
    +
    +    /**
    +     * getVisualizationData.
    +     * @param client client
    +     * @param window window
    +     * @param topoId topoId
    +     * @param sys sys
    +     * @return getVisualizationData
    +     * @throws TException TException
    +     */
    +    public static Map<String, Object> getVisualizationData(
    +            Nimbus.Iface client, String window, String topoId, boolean 
sys) throws TException {
    +        GetInfoOptions getInfoOptions = new GetInfoOptions();
    +        getInfoOptions.set_num_err_choice(NumErrorsChoice.ONE);
    +        TopologyInfo topologyInfo = client.getTopologyInfoWithOpts(topoId, 
getInfoOptions);
    +        StormTopology stormTopology = client.getTopology(topoId);
    +        Map<String, List<ExecutorSummary>> boltSummaries = 
getBoltExecutors(topologyInfo.get_executors(), stormTopology, sys);
    +        Map<String, List<ExecutorSummary>> spoutSummaries = 
getSpoutExecutors(topologyInfo.get_executors(), stormTopology);
    +
    +        Map<String, SpoutSpec> spoutSpecs = stormTopology.get_spouts();
    +        Map<String, Bolt> boltSpecs = stormTopology.get_bolts();
    +
    +        Map<String, Object> result = new HashMap();
    +
    +        for (Map.Entry<String, SpoutSpec> spoutSpecMapEntry : 
spoutSpecs.entrySet()) {
    +            String spoutComponentId = spoutSpecMapEntry.getKey();
    +            if (spoutSummaries.containsKey(spoutComponentId)) {
    +                Map<String, Object> spoutData = new HashMap();
    +                spoutData.put("type", "spout");
    +                spoutData.put("capacity", 0);
    +                Map<String, Map> spoutStreamsStats = 
StatsUtil.spoutStreamsStats(spoutSummaries.get(spoutComponentId), true);
    +                spoutData.put("latency", 
spoutStreamsStats.get("complete-latencies").get(window));
    +                spoutData.put("transferred", 
spoutStreamsStats.get("transferred").get(window));
    +                spoutData.put("stats", spoutSummaries.get(
    +                        spoutComponentId).stream().map(
    +                                
UIHelpers::getStatMapFromExecutorSummary).collect(Collectors.toList()));
    +                spoutData.put("link", 
UIHelpers.urlFormat("/component.html?id=%s&topology_id=%s", spoutComponentId, 
topoId));
    +
    +                result.put("inputs",
    +                    
spoutSpecMapEntry.getValue().get_common().get_inputs().entrySet().stream().map(
    +                            
UIHelpers::getInputMap).collect(Collectors.toList())
    +                );
    +                result.put(spoutComponentId, spoutData);
    +            }
    +        }
    +
    +        for (Map.Entry<String, Bolt> boltEntry : boltSpecs.entrySet()) {
    +            String boltComponentId = boltEntry.getKey();
    +            if (boltSummaries.containsKey(boltComponentId) && (sys || 
Utils.isSystemId(boltComponentId))) {
    +                Map<String, Object> boltMap = new HashMap();
    +                boltMap.put("type", "bolt");
    +                boltMap.put("capacity", 
StatsUtil.computeBoltCapacity(boltSummaries.get(boltComponentId)));
    +                Map<String, Map> boltStreamsStats = 
StatsUtil.boltStreamsStats(boltSummaries.get(boltComponentId), true);
    +                boltMap.put("latency", 
boltStreamsStats.get("process-latencies").get(window));
    +                boltMap.put("transferred", 
boltStreamsStats.get("transferred").get(window));
    +                boltMap.put("stats", boltSummaries.get(
    +                        boltComponentId).stream().map(
    +                        
UIHelpers::getStatMapFromExecutorSummary).collect(Collectors.toList()));
    +                boltMap.put("link", 
UIHelpers.urlFormat("/component.html?id=%s&topology_id=%s", boltComponentId, 
topoId));
    +
    +                result.put("inputs",
    +                        
boltEntry.getValue().get_common().get_inputs().entrySet().stream().map(
    +                                
UIHelpers::getInputMap).collect(Collectors.toList())
    +                );
    +                result.put(boltComponentId, boltMap);
    +            }
    +        }
    +        return result;
    +    }
    +
    +    /**
    +     * getStreamBox.
    +     * @param visualization visualization
    +     * @return getStreamBox
    +     */
    +    public static Map<String, Object> getStreamBox(Object visualization) {
    +        Map<String, Object> visualizationData = (Map<String, Object>) 
visualization;
    +        Map<String, Object> result = new HashMap();
    +        Map<String, Object> temp = (Map<String, Object>) 
visualizationData.get("inputs");
    +        result.put("stream", temp.get("stream"));
    +        result.put("sani-stream", temp.get("sani-stream"));
    +        result.put("checked", !Utils.isSystemId((String) 
temp.get("stream")));
    +        return result;
    +    }
    +
    +    /**
    +     * getBuildVisualization.
    +     * @param client client
    +     * @param config config
    +     * @param window window
    +     * @param id id
    +     * @param sys sys
    +     * @ret
    --- End diff --
    
    Not totally sure what happened here but the visualization does not work 
because the groupings are not being mapped to JSON correctly.
    
    ```
    
{"inputs":[{"component":"__acker","sani-stream":"_s__ack_reset_timeout","stream":"__ack_reset_timeout","grouping":<Grouping
 
direct:NullStruct()>},{"component":"__acker","sani-stream":"_s__ack_ack","stream":"__ack_ack","grouping":<Grouping
 
direct:NullStruct()>},{"component":"__acker","sani-stream":"_s__ack_fail","stream":"__ack_fail","grouping":<Grouping
 
direct:NullStruct()>}],"spout":{"transferred":2800,"stats":[{"transferred":{"10800":{"default":2800},"600":{"default":2800},"86400":{"default":2800},":all-time":{"default":2800}},"port":6700,"host":"10.0.1.12","uptime_secs":61},{"transferred":{"10800":{"default":560},"600":{"default":560},"86400":{"default":560},":all-time":{"default":560}},"port":6701,"host":"10.0.1.12","uptime_secs":60},{"transferred":{"10800":{"default":560},"600":{"default":560},"86400":{"default":560},":all-time":{"default":560}},"port":6702,"host":"10.0.1.12","uptime_secs":61},{"transferred":{"10800":{"default":560},"600":{"default":560},"86400":{"default":5
 
60},":all-time":{"default":560}},"port":6700,"host":"10.0.1.12","uptime_secs":61},{"transferred":{"10800":{"default":560},"600":{"default":560},"86400":{"default":560},":all-time":{"default":560}},"port":6702,"host":"10.0.1.12","uptime_secs":61}],"latency":null,"link":"\/component.html?id=spout&topology_id=wc-1-1533304535","type":"spout","capacity":0}}
    ```
    
    `<Grouping direct:NullStruct()>` is not what we want as output.


---

Reply via email to