Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2752#discussion_r207554122 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java --- @@ -0,0 +1,1941 @@ +/** + * Licensed to the Apache Software Foundation (ASF) + * under one or more contributor license agreements. + * See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.daemon.ui; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import javax.servlet.DispatcherType; +import javax.servlet.Servlet; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.SecurityContext; + +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.DaemonConfig; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.BoltAggregateStats; +import org.apache.storm.generated.ClusterSummary; +import org.apache.storm.generated.CommonAggregateStats; +import org.apache.storm.generated.ComponentAggregateStats; +import org.apache.storm.generated.ComponentPageInfo; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.ExecutorSummary; +import org.apache.storm.generated.GetInfoOptions; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.KillOptions; +import org.apache.storm.generated.LogConfig; +import org.apache.storm.generated.LogLevel; +import org.apache.storm.generated.LogLevelAction; +import org.apache.storm.generated.Nimbus; +import org.apache.storm.generated.NimbusSummary; +import org.apache.storm.generated.NodeInfo; +import org.apache.storm.generated.NumErrorsChoice; +import org.apache.storm.generated.OwnerResourceSummary; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.generated.RebalanceOptions; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.SupervisorPageInfo; +import org.apache.storm.generated.SupervisorSummary; +import org.apache.storm.generated.TopologyHistoryInfo; +import org.apache.storm.generated.TopologyInfo; +import org.apache.storm.generated.TopologyPageInfo; +import org.apache.storm.generated.TopologyStats; +import org.apache.storm.generated.TopologySummary; +import org.apache.storm.generated.WorkerSummary; +import org.apache.storm.logging.filters.AccessLoggingFilter; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.thrift.TException; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.TopologySpoutLag; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.apache.storm.utils.WebAppUtils; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.SecureRequestCustomizer; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.SslConnectionFactory; +import org.eclipse.jetty.servlet.FilterHolder; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.servlets.CrossOriginFilter; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.json.simple.JSONValue; + +public class UIHelpers { + + private static final Object[][] PRETTY_SEC_DIVIDERS = { + new Object[]{ "s", 60 }, + new Object[]{ "m", 60 }, + new Object[]{ "h", 24 }, + new Object[]{ "d", null } + }; + + private static final Object[][] PRETTY_MS_DIVIDERS = { + new Object[]{ "ms", 1000 }, + new Object[]{ "s", 60 }, + new Object[]{ "m", 60 }, + new Object[]{ "h", 24 }, + new Object[]{ "d", null } + }; + + /** + * Prettify uptime string. + * @param val val. + * @param dividers dividers. + * @return prettified uptime string. + */ + public static String prettyUptimeStr(String val, Object[][] dividers) { + int uptime = Integer.parseInt(val); + LinkedList<String> tmp = new LinkedList<>(); + for (Object[] divider : dividers) { + if (uptime > 0) { + String state = (String) divider[0]; + Integer div = (Integer) divider[1]; + if (div != null) { + tmp.addFirst(uptime % div + state); + uptime = uptime / div; + } else { + tmp.addFirst(uptime + state); + } + } + } + return Joiner.on(" ").join(tmp); + } + + /** + * Prettify uptime string. + * @param sec uptime in seconds. + * @return prettified uptime string. + */ + public static String prettyUptimeSec(String sec) { + return prettyUptimeStr(sec, PRETTY_SEC_DIVIDERS); + } + + /** + * prettyUptimeSec. + * @param secs secs + * @return prettyUptimeSec + */ + public static String prettyUptimeSec(int secs) { + return prettyUptimeStr(String.valueOf(secs), PRETTY_SEC_DIVIDERS); + } + + /** + * prettyUptimeMs. + * @param ms ms + * @return prettyUptimeMs + */ + public static String prettyUptimeMs(String ms) { + return prettyUptimeStr(ms, PRETTY_MS_DIVIDERS); + } + + /** + * prettyUptimeMs. + * @param ms ms + * @return prettyUptimeMs + */ + public static String prettyUptimeMs(int ms) { + return prettyUptimeStr(String.valueOf(ms), PRETTY_MS_DIVIDERS); + } + + /** + * url formatter for log links. + * @param fmt string format + * @param args hostname and other arguments. + * @return string formatter + */ + public static String urlFormat(String fmt, Object... args) { + String[] argsEncoded = new String[args.length]; + for (int i = 0; i < args.length; i++) { + argsEncoded[i] = URLEncoder.encode(String.valueOf(args[i])); + } + return String.format(fmt, argsEncoded); + } + + /** + * Prettified executor info. + * @param e from Nimbus call + * @return prettified executor info string + */ + public static String prettyExecutorInfo(ExecutorInfo e) { + return "[" + e.get_task_start() + "-" + e.get_task_end() + "]"; + } + + /** + * Unauthorized user json. + * @param user User id. + * @return Unauthorized user json. + */ + public static Map<String, Object> unauthorizedUserJson(String user) { + return ImmutableMap.of( + "error", "No Authorization", + "errorMessage", String.format("User %s is not authorized.", user)); + } + + private static ServerConnector mkSslConnector(Server server, Integer port, String ksPath, + String ksPassword, String ksType, + String keyPassword, String tsPath, + String tsPassword, String tsType, + Boolean needClientAuth, Boolean wantClientAuth, + Integer headerBufferSize) { + SslContextFactory factory = new SslContextFactory(); + factory.setExcludeCipherSuites("SSL_RSA_WITH_RC4_128_MD5", "SSL_RSA_WITH_RC4_128_SHA"); + factory.setExcludeProtocols("SSLv3"); + factory.setRenegotiationAllowed(false); + factory.setKeyStorePath(ksPath); + factory.setKeyStoreType(ksType); + factory.setKeyStorePassword(ksPassword); + factory.setKeyManagerPassword(keyPassword); + + if (tsPath != null && tsPassword != null && tsType != null) { + factory.setTrustStorePath(tsPath); + factory.setTrustStoreType(tsType); + factory.setTrustStorePassword(tsPassword); + } + + if (needClientAuth != null && needClientAuth) { + factory.setNeedClientAuth(true); + } else if (wantClientAuth != null && wantClientAuth) { + factory.setWantClientAuth(true); + } + + HttpConfiguration httpsConfig = new HttpConfiguration(); + httpsConfig.addCustomizer(new SecureRequestCustomizer()); + if (null != headerBufferSize) { + httpsConfig.setRequestHeaderSize(headerBufferSize); + } + ServerConnector sslConnector = new ServerConnector( + server, + new SslConnectionFactory(factory, HttpVersion.HTTP_1_1.asString()), + new HttpConnectionFactory(httpsConfig) + ); + sslConnector.setPort(port); + return sslConnector; + } + + public static void configSsl(Server server, Integer port, String ksPath, + String ksPassword, String ksType, + String keyPassword, String tsPath, + String tsPassword, String tsType, + Boolean needClientAuth, Boolean wantClientAuth) { + configSsl(server, port, ksPath, ksPassword, ksType, keyPassword, + tsPath, tsPassword, tsType, needClientAuth, wantClientAuth, null); + } + + /** + * configSsl. + * @param server server + * @param port port + * @param ksPath ksPath + * @param ksPassword ksPassword + * @param ksType ksType + * @param keyPassword keyPassword + * @param tsPath tsPath + * @param tsPassword tsPassword + * @param tsType tsType + * @param needClientAuth needClientAuth + * @param wantClientAuth wantClientAuth + * @param headerBufferSize headerBufferSize + */ + public static void configSsl(Server server, Integer port, String ksPath, + String ksPassword, String ksType, + String keyPassword, String tsPath, + String tsPassword, String tsType, + Boolean needClientAuth, + Boolean wantClientAuth, Integer headerBufferSize) { + if (port > 0) { + server.addConnector( + mkSslConnector( + server, port, ksPath, ksPassword, ksType, keyPassword, + tsPath, tsPassword, tsType, + needClientAuth, wantClientAuth, headerBufferSize + ) + ); + } + } + + /** + * corsFilterHandle. + * @return corsFilterHandle + */ + public static FilterHolder corsFilterHandle() { + FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter()); + filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, "*"); + filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, "GET, POST, PUT"); + filterHolder.setInitParameter( + CrossOriginFilter.ALLOWED_ORIGINS_PARAM, + "X-Requested-With, X-Requested-By, Access-Control-Allow-Origin," + + " Content-Type, Content-Length, Accept, Origin"); + filterHolder.setInitParameter(CrossOriginFilter.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*"); + return filterHolder; + } + + /** + * mkAccessLoggingFilterHandle. + * @return mkAccessLoggingFilterHandle + */ + public static FilterHolder mkAccessLoggingFilterHandle() { + + return new FilterHolder(new AccessLoggingFilter()); + + } + + public static void configFilter(Server server, Servlet servlet, List<FilterConfiguration> filtersConfs) { + configFilter(server, servlet, filtersConfs, null); + } + + /** + * Config filter. + * @param server Server + * @param servlet Servlet + * @param filtersConfs FiltersConfs + * @param params Filter params + */ + public static void configFilter(Server server, Servlet servlet, + List<FilterConfiguration> filtersConfs, + Map<String, String> params) { + if (filtersConfs != null) { + ServletHolder servletHolder = new ServletHolder(servlet); + servletHolder.setInitOrder(0); + if (params != null) { + servletHolder.setInitParameters(params); + } + ServletContextHandler context = new ServletContextHandler(server, "/"); + context.addServlet(servletHolder, "/"); + configFilters(context, filtersConfs); + server.setHandler(context); + } + } + + /** + * Config filters. + * @param context Servlet context + * @param filtersConfs filter confs + */ + public static void configFilters(ServletContextHandler context, + List<FilterConfiguration> filtersConfs) { + context.addFilter(corsFilterHandle(), "/*", EnumSet.allOf(DispatcherType.class)); + for (FilterConfiguration filterConf : filtersConfs) { + String filterName = filterConf.getFilterName(); + String filterClass = filterConf.getFilterClass(); + Map<String, String> filterParams = filterConf.getFilterParams(); + if (filterClass != null) { + FilterHolder filterHolder = new FilterHolder(); + filterHolder.setClassName(filterClass); + if (filterName != null) { + filterHolder.setName(filterName); + } else { + filterHolder.setName(filterClass); + } + if (filterParams != null) { + filterHolder.setInitParameters(filterParams); + } else { + filterHolder.setInitParameters(new HashMap<>()); + } + context.addFilter(filterHolder, "/*", EnumSet.allOf(DispatcherType.class)); + } + } + context.addFilter(mkAccessLoggingFilterHandle(), "/*", EnumSet.allOf(DispatcherType.class)); + } + + /** + * Construct a Jetty Server instance. + */ + public static Server jettyCreateServer(Integer port, String host, Integer httpsPort) { + return jettyCreateServer(port, host, httpsPort, null); + } + + /** + * Construct a Jetty Server instance. + */ + public static Server jettyCreateServer(Integer port, String host, + Integer httpsPort, Integer headerBufferSize) { + Server server = new Server(); + + if (httpsPort == null || httpsPort <= 0) { + HttpConfiguration httpConfig = new HttpConfiguration(); + httpConfig.setSendDateHeader(true); + if (null != headerBufferSize) { + httpConfig.setRequestHeaderSize(headerBufferSize); + } + ServerConnector httpConnector = new ServerConnector( + server, new HttpConnectionFactory(httpConfig) + ); + httpConnector.setPort(ObjectReader.getInt(port, 80)); + httpConnector.setIdleTimeout(200000); + httpConnector.setHost(host); + server.addConnector(httpConnector); + } + + return server; + } + + /** + * Modified version of run-jetty + * Assumes configurator sets handler. + */ + public static void stormRunJetty(Integer port, String host, + Integer httpsPort, Integer headerBufferSize, + IConfigurator configurator) throws Exception { + Server s = jettyCreateServer(port, host, httpsPort, headerBufferSize); + if (configurator != null) { + configurator.execute(s); + } + s.start(); + } + + public static void stormRunJetty(Integer port, Integer headerBufferSize, + IConfigurator configurator) throws Exception { + stormRunJetty(port, null, null, headerBufferSize, configurator); + } + + /** + * wrapJsonInCallback. + * @param callback callbackParameterName + * @param response response + * @return wrapJsonInCallback + */ + public static String wrapJsonInCallback(String callback, String response) { + return callback + "(" + response + ");"; + } + + /** + * getJsonResponseHeaders. + * @param callback callbackParameterName + * @param headers headers + * @return getJsonResponseHeaders + */ + public static Map getJsonResponseHeaders(String callback, Map headers) { + Map<String, String> headersResult = new HashMap<>(); + headersResult.put("Cache-Control", "no-cache, no-store"); + headersResult.put("Access-Control-Allow-Origin", "*"); + headersResult.put("Access-Control-Allow-Headers", + "Content-Type, Access-Control-Allow-Headers, " + + "Access-Controler-Allow-Origin, " + + "X-Requested-By, X-Csrf-Token, " + + "Authorization, X-Requested-With"); + if (callback != null) { + headersResult.put("Content-Type", "application/javascript;charset=utf-8"); + } else { + headersResult.put("Content-Type", "application/json;charset=utf-8"); + } + if (headers != null) { + headersResult.putAll(headers); + } + return headersResult; + } + + public static String getJsonResponseBody(Object data, String callback, boolean needSerialize) { + String serializedData = needSerialize ? JSONValue.toJSONString(data) : (String) data; + return callback != null ? wrapJsonInCallback(callback, serializedData) : serializedData; + } + + /** + * Converts exception into json map. + * @param ex Exception to be converted. + * @param statusCode Status code to be returned. + * @return Map to be converted into json. + */ + public static Map exceptionToJson(Exception ex, int statusCode) { + StringWriter sw = new StringWriter(); + ex.printStackTrace(new PrintWriter(sw)); + return ImmutableMap.of( + "error", statusCode + + " " + + HttpStatus.getMessage(statusCode), + "errorMessage", sw.toString()); + } + + public static Response makeStandardResponse(Object data, String callback) { + return makeStandardResponse(data, callback, true, Response.Status.OK); + } + + public static Response makeStandardResponse(Object data, String callback, Response.Status status) { + return makeStandardResponse(data, callback, true, status); + } + + /** + * makeStandardResponse. + * @param data data + * @param callback callbackParameterName + * @param needsSerialization needsSerialization + * @return makeStandardResponse + */ + public static Response makeStandardResponse( + Object data, String callback, boolean needsSerialization, Response.Status status) { + String body = getJsonResponseBody(data, callback, needsSerialization); + Response.ResponseBuilder responseBuilder = Response.status(status).entity(body); + Map<String, String> headers = getJsonResponseHeaders(callback, null); + for (Map.Entry<String, String> headerEntry: headers.entrySet()) { + responseBuilder.header(headerEntry.getKey(), headerEntry.getValue()); + } + return responseBuilder.build(); + } + + /** + * Converts thrift call result into map fit for UI/api. + * @param clusterSummary Obtained from Nimbus. + * @param user User Making request + * @param conf Storm Conf + * @return Cluster Summary for display on UI/monitoring purposes via API + */ + public static Map<String, Object> getClusterSummary(ClusterSummary clusterSummary, String user, + Map<String, Object> conf) { + Map<String, Object> result = new HashMap(); + List<SupervisorSummary> supervisorSummaries = clusterSummary.get_supervisors(); + List<TopologySummary> topologySummaries = clusterSummary.get_topologies(); + + int usedSlots = + supervisorSummaries.stream().mapToInt( + SupervisorSummary::get_num_used_workers).sum(); + int totalSlots = + supervisorSummaries.stream().mapToInt( + SupervisorSummary::get_num_workers).sum(); + + int totalTasks = + topologySummaries.stream().mapToInt( + TopologySummary::get_num_tasks).sum(); + int totalExecutors = + topologySummaries.stream().mapToInt( + TopologySummary::get_num_executors).sum(); + + double supervisorTotalMemory = + supervisorSummaries.stream().mapToDouble(x -> x.get_total_resources().getOrDefault( + Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, + x.get_total_resources().get(Config.SUPERVISOR_MEMORY_CAPACITY_MB) + ) + ).sum(); + + double supervisorTotalCpu = + supervisorSummaries.stream().mapToDouble(x -> x.get_total_resources().getOrDefault( + Constants.COMMON_CPU_RESOURCE_NAME, + x.get_total_resources().get(Config.SUPERVISOR_CPU_CAPACITY) + ) + ).sum(); + + double supervisorUsedMemory = + supervisorSummaries.stream().mapToDouble(SupervisorSummary::get_used_mem).sum(); + double supervisorUsedCpu = + supervisorSummaries.stream().mapToDouble(SupervisorSummary::get_used_cpu).sum(); + double supervisorFragementedCpu = + supervisorSummaries.stream().mapToDouble( + SupervisorSummary::get_fragmented_cpu).sum(); + double supervisorFragmentedMem = + supervisorSummaries.stream().mapToDouble( + SupervisorSummary::get_fragmented_mem).sum(); + + + result.put("user", user); + result.put("stormVersion", VersionInfo.getVersion()); + result.put("supervisors", supervisorSummaries.size()); + result.put("topologies", clusterSummary.get_topologies_size()); + result.put("slotsUsed", usedSlots); + result.put("slotsTotal", totalSlots); + result.put("slotsFree", totalSlots - usedSlots); + result.put("tasksTotal", totalTasks); + result.put("totalExecutors", totalExecutors); + + result.put("totalMem", supervisorTotalMemory); + result.put("totalCpu", supervisorTotalCpu); + result.put("availMem", supervisorTotalMemory - supervisorUsedMemory); + result.put("availCpu", supervisorTotalCpu - supervisorUsedCpu); + result.put("fragmentedMem", supervisorFragmentedMem); + result.put("fragmentedCpu", supervisorFragementedCpu); + result.put("schedulerDisplayResource", + conf.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE)); + result.put("memAssignedPercentUtil", supervisorTotalMemory > 0 + ? String.valueOf((supervisorTotalMemory - supervisorUsedMemory) * 100.0 + / supervisorTotalMemory) : "0.0"); + result.put("cpuAssignedPercentUtil", supervisorTotalCpu > 0 + ? String.valueOf((supervisorTotalCpu - supervisorUsedCpu) * 100.0 + / supervisorTotalCpu) : "0.0"); + result.put("bugtracker-url", conf.get(DaemonConfig.UI_PROJECT_BUGTRACKER_URL)); + result.put("central-log-url", conf.get(DaemonConfig.UI_CENTRAL_LOGGING_URL)); + return result; + } + + /** + * Prettify OwnerResourceSummary. + * @param ownerResourceSummary ownerResourceSummary + * @return Map of prettified OwnerResourceSummary. + */ + public static Map<String, Object> unpackOwnerResourceSummary( + OwnerResourceSummary ownerResourceSummary) { + + Double memoryGuarantee = Double.valueOf(-1); + if (ownerResourceSummary.is_set_memory_guarantee()) { + memoryGuarantee = ownerResourceSummary.get_memory_guarantee(); + } + + Double cpuGuaranteee = Double.valueOf(-1); + if (ownerResourceSummary.is_set_cpu_guarantee()) { + cpuGuaranteee = ownerResourceSummary.get_cpu_guarantee(); + } + + int isolatedNodeGuarantee = -1; + if (ownerResourceSummary.is_set_isolated_node_guarantee()) { + isolatedNodeGuarantee = ownerResourceSummary.get_isolated_node_guarantee(); + } + + Double memoryGuaranteeRemaining = Double.valueOf(-1); + if (ownerResourceSummary.is_set_memory_guarantee_remaining()) { + memoryGuaranteeRemaining = ownerResourceSummary.get_memory_guarantee_remaining(); + } + + Double cpuGuaranteeRemaining = Double.valueOf(-1); + if (ownerResourceSummary.is_set_cpu_guarantee_remaining()) { + cpuGuaranteeRemaining = ownerResourceSummary.get_cpu_guarantee_remaining(); + } + + Map<String, Object> result = new HashMap(); + result.put("owner", ownerResourceSummary.get_owner()); + result.put("totalTopologies", ownerResourceSummary.get_total_topologies()); + result.put("totalExecutors", ownerResourceSummary.get_total_executors()); + result.put("totalWorkers", ownerResourceSummary.get_total_workers()); + result.put("totalTasks", ownerResourceSummary.get_total_tasks()); + result.put("totalMemoryUsage", ownerResourceSummary.get_memory_usage()); + result.put("totalCpuUsage", ownerResourceSummary.get_cpu_usage()); + + result.put("memoryGuarantee", memoryGuarantee != -1 ? memoryGuarantee : "N/A"); + result.put("cpuGuarantee", cpuGuaranteee != -1 ? cpuGuaranteee : "N/A"); + result.put("isolatedNodes", isolatedNodeGuarantee); + + result.put("memoryGuaranteeRemaining", + memoryGuaranteeRemaining != -1 ? memoryGuaranteeRemaining : "N/A"); + result.put("cpuGuaranteeRemaining", + cpuGuaranteeRemaining != -1 ? cpuGuaranteeRemaining : "N/A"); + result.put("totalReqOnHeapMem", ownerResourceSummary.get_requested_on_heap_memory()); + + result.put("totalReqOffHeapMem", ownerResourceSummary.get_requested_off_heap_memory()); + + result.put("totalReqMem", ownerResourceSummary.get_requested_total_memory()); + result.put("totalReqCpu", ownerResourceSummary.get_requested_cpu()); + result.put("totalAssignedOnHeapMem", + ownerResourceSummary.get_assigned_on_heap_memory() + ); + result.put("totalAssignedOffHeapMem", ownerResourceSummary.get_assigned_off_heap_memory()); + + return result; + } + + /** + * Get prettified ownerResourceSummaries. + * @param ownerResourceSummaries ownerResourceSummaries from thrift call + * @param conf Storm conf + * @return map to be converted to json. + */ + public static Map<String, Object> getOwnerResourceSummaries( + List<OwnerResourceSummary> ownerResourceSummaries, Map<String, Object> conf) { + Map<String, Object> result = new HashMap(); + + result.put("schedulerDisplayResource", conf.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE)); + + List<Map<String, Object>> ownerSummaries = new ArrayList(); + + for (OwnerResourceSummary ownerResourceSummary : ownerResourceSummaries) { + ownerSummaries.add(unpackOwnerResourceSummary(ownerResourceSummary)); + } + result.put("owners", ownerSummaries); + return result; + } + + /** + * getTopologyMap. + * @param topologySummary topologySummary + * @return getTopologyMap + */ + public static Map<String, Object> getTopologyMap(TopologySummary topologySummary) { + Map<String, Object> result = new HashMap(); + result.put("id", topologySummary.get_id()); + result.put("encodedId", URLEncoder.encode(topologySummary.get_id())); + result.put("owner", topologySummary.get_owner()); + result.put("name", topologySummary.get_name()); + result.put("status", topologySummary.get_status()); + result.put("uptime", UIHelpers.prettyUptimeSec(topologySummary.get_uptime_secs())); + result.put("uptimeSeconds", topologySummary.get_uptime_secs()); + result.put("tasksTotal", topologySummary.get_num_tasks()); + result.put("workersTotal", topologySummary.get_num_workers()); + result.put("executorsTotal", topologySummary.get_num_executors()); + result.put("replicationCount", topologySummary.get_replication_count()); + result.put("schedulerInfo", topologySummary.get_sched_status()); + result.put("requestedMemOnHeap", topologySummary.get_requested_memonheap()); + result.put("requestedMemOffHeap", topologySummary.get_requested_memoffheap()); + result.put("requestedTotalMem", + topologySummary.get_requested_memoffheap() + + topologySummary.get_assigned_memonheap()); + result.put("requestedCpu", topologySummary.get_requested_cpu()); + result.put("assignedMemOnHeap", topologySummary.get_assigned_memonheap()); + result.put("assignedMemOffHeap", topologySummary.get_assigned_memoffheap()); + result.put("assignedTotalMem", + topologySummary.get_assigned_memoffheap() + + topologySummary.get_assigned_memonheap()); + result.put("assignedCpu", topologySummary.get_assigned_cpu()); + result.put("topologyVersion", topologySummary.get_topology_version()); + result.put("stormVersion", topologySummary.get_storm_version()); + return result; + } + + /** + * Get a specific owner resource summary. + * @param ownerResourceSummaries Result from thrift call. + * @param client client + * @param id Owner id. + * @param config Storm conf. + * @return prettified owner resource summary. + */ + public static Map<String, Object> getOwnerResourceSummary( + List<OwnerResourceSummary> ownerResourceSummaries, + Nimbus.Iface client, String id, Map<String, Object> config) throws TException { + Map<String, Object> result = new HashMap(); + result.put("schedulerDisplayResource", config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE)); + + if (ownerResourceSummaries.isEmpty()) { + return unpackOwnerResourceSummary(new OwnerResourceSummary(id)); + } + + List<TopologySummary> topologies = null; + topologies = client.getClusterInfo().get_topologies(); + List<Map> topologySummaries = getTopologiesMap(id, topologies); + + result.putAll(unpackOwnerResourceSummary(ownerResourceSummaries.get(0))); + result.put("topologies", topologySummaries); + + return result; + } + + /** + * getTopologiesMap. + * @param id id + * @param topologies topologies + * @return getTopologiesMap + */ + private static List<Map> getTopologiesMap(String id, List<TopologySummary> topologies) { + List<Map> topologySummaries = new ArrayList(); + + for (TopologySummary topologySummary : topologies) { + if (id == null || topologySummary.get_owner().equals(id)) { + topologySummaries.add(getTopologyMap(topologySummary)); + } + } + return topologySummaries; + } + + /** + * getLogviewerLink. + * @param host host + * @param fname fname + * @param config config + * @param port port + * @return getLogviewerLink. + */ + public static String getLogviewerLink(String host, String fname, + Map<String, Object> config, int port) { + if (isSecureLogviewer(config)) { + return UIHelpers.urlFormat("https://%s:%s/api/v1/log?file=%s", + host, config.get(DaemonConfig.LOGVIEWER_HTTPS_PORT), fname); + } else { + return UIHelpers.urlFormat("http://%s:%s/api/v1/log?file=%s", + host, config.get(DaemonConfig.LOGVIEWER_PORT), fname); + } + } + + /** + * Get log link to nimbus log. + * @param host nimbus host name + * @param config storm config + * @return log link. + */ + public static String getNimbusLogLink(String host, Map<String, Object> config) { + if (isSecureLogviewer(config)) { + return UIHelpers.urlFormat("https://%s:%s/api/v1/daemonlog?file=nimbus.log", + host, config.get(DaemonConfig.LOGVIEWER_HTTPS_PORT)); + } + return UIHelpers.urlFormat("http://%s:%s/api/v1/daemonlog?file=nimbus.log", + host, config.get(DaemonConfig.LOGVIEWER_PORT)); + } + + /** + * Get log link to supervisor log. + * @param host supervisor host name + * @param config storm config + * @return log link. + */ + public static String getSupervisorLogLink(String host, Map<String, Object> config) { + if (isSecureLogviewer(config)) { + return UIHelpers.urlFormat("https://%s:%s/api/v1/daemonlog?file=supervisor.log", + host, config.get(DaemonConfig.LOGVIEWER_HTTPS_PORT)); + } + return UIHelpers.urlFormat("http://%s:%s/api/v1/daemonlog?file=supervisor.log", + host, config.get(DaemonConfig.LOGVIEWER_PORT)); + } + + /** + * Get log link to supervisor log. + * @param host supervisor host name + * @param config storm config + * @return log link. + */ + public static String getWorkerLogLink(String host, int port, + Map<String, Object> config, String topologyId) { + return getLogviewerLink(host, + WebAppUtils.logsFilename( + topologyId, String.valueOf(port)), + config, port + ); + } + + /** + * Get supervisor info in a map. + * @param supervisorSummary from nimbus call. + * @param config Storm config. + * @return prettified supervisor info map. + */ + public static Map<String, Object> getPrettifiedSupervisorMap( + SupervisorSummary supervisorSummary, + Map<String, Object> config) { + Map<String, Object> result = new HashMap(); + + result.put("id", supervisorSummary.get_supervisor_id()); + result.put("host", supervisorSummary.get_host()); + result.put("uptime", UIHelpers.prettyUptimeSec(supervisorSummary.get_uptime_secs())); + result.put("uptimeSeconds", supervisorSummary.get_uptime_secs()); + result.put("slotsTotal", supervisorSummary.get_num_workers()); + result.put("slotsUsed", supervisorSummary.get_num_used_workers()); + result.put("slotsFree", + Integer.max(supervisorSummary.get_num_workers() + - supervisorSummary.get_num_used_workers(), 0)); + Map<String, Double> totalResources = supervisorSummary.get_total_resources(); + Double totalMemory = totalResources.getOrDefault( + Constants.COMMON_TOTAL_MEMORY_RESOURCE_NAME, + totalResources.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB) + ); + result.put("totalMem", + totalMemory + ); + Double totalCpu = totalResources.getOrDefault( + Constants.COMMON_CPU_RESOURCE_NAME, + totalResources.get(Config.SUPERVISOR_CPU_CAPACITY) + ); + result.put("totalCpu", + totalCpu); + result.put("usedMem", supervisorSummary.get_used_mem()); + result.put("usedCpu", supervisorSummary.get_used_cpu()); + result.put( + "logLink", + getSupervisorLogLink(supervisorSummary.get_host(), config) + ); + result.put("availMem", totalMemory - supervisorSummary.get_used_mem()); + result.put("availCpu", totalCpu - supervisorSummary.get_used_cpu()); + result.put("version", supervisorSummary.get_version()); + return result; + } + + /** + * Get topology history. + * @param topologyHistory from Nimbus call. + * @return map ready to be returned. + */ + public static Map<String, Object> getTopologyHistoryInfo(TopologyHistoryInfo topologyHistory) { + Map<String, Object> result = new HashMap(); + result.put("topo-history", topologyHistory.get_topo_ids()); + return result; + } + + /** + * Check if logviewer is secure. + * @param config Storm config. + * @return true if logiviwer is secure. + */ + public static boolean isSecureLogviewer(Map<String, Object> config) { + if (config.containsKey(DaemonConfig.LOGVIEWER_HTTPS_PORT)) { + int logviewerPort = (int) config.get(DaemonConfig.LOGVIEWER_HTTPS_PORT); + if (logviewerPort >= 0) { + return true; + } + } + return false; + } + + /** + * Get logviewer port depending on whether the logviewer is secure or not. + * @param config Storm config. + * @return appropriate port. + */ + public static int getLogviewerPort(Map<String, Object> config) { + if (isSecureLogviewer(config)) { + return (int) config.get(DaemonConfig.LOGVIEWER_HTTPS_PORT); + } + return (int) config.get(DaemonConfig.LOGVIEWER_PORT); + } + + /** + * getWorkerSummaries. + * @param supervisorPageInfo supervisorPageInfo + * @param config config + * @return getWorkerSummaries + */ + public static List<Map> getWorkerSummaries(SupervisorPageInfo supervisorPageInfo, + Map<String, Object> config) { + List<Map> workerSummaries = new ArrayList(); + if (supervisorPageInfo.is_set_worker_summaries()) { + for (WorkerSummary workerSummary : supervisorPageInfo.get_worker_summaries()) { + workerSummaries.add(getWorkerSummaryMap(workerSummary, config)); + } + } + return workerSummaries; + } + + /** + * getWorkerSummaryMap. + * @param workerSummary workerSummary + * @param config config + * @return getWorkerSummaryMap + */ + private static Map getWorkerSummaryMap(WorkerSummary workerSummary, Map<String, Object> config) { + Map<String, Object> result = new HashMap(); + result.put("supervisorId", workerSummary.get_supervisor_id()); + result.put("host", workerSummary.get_host()); + result.put("port", workerSummary.get_port()); + result.put("topologyId", workerSummary.get_topology_id()); + result.put("topologyName", workerSummary.get_topology_name()); + result.put("executorsTotal", workerSummary.get_num_executors()); + result.put("assignedMemOnHeap", workerSummary.get_assigned_memonheap()); + result.put("assignedMemOffHeap", workerSummary.get_assigned_memoffheap()); + result.put("assignedCpu", workerSummary.get_assigned_cpu()); + result.put("componentNumTasks", workerSummary.get_component_to_num_tasks()); + result.put("uptime", UIHelpers.prettyUptimeSec(workerSummary.get_uptime_secs())); + result.put("uptimeSeconds", workerSummary.get_uptime_secs()); + result.put("workerLogLink", getWorkerLogLink(workerSummary.get_host(), + workerSummary.get_port(), config, workerSummary.get_topology_id())); + return result; + } + + /** + * getSupervisorSummary. + * @param supervisors supervisor summary list. + * @param securityContext security context injected. + * @param config Storm config. + * @return Prettified JSON. + */ + public static Map<String, Object> getSupervisorSummary( + List<SupervisorSummary> supervisors, + SecurityContext securityContext, Map<String, Object> config) { + Map<String, Object> result = new HashMap(); + addLogviewerInfo(config, result); + List<Map> supervisorMaps = getSupervisorsMap(supervisors, config); + result.put("supervisors", supervisorMaps); + result.put("schedulerDisplayResource", + config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE) + ); + + return result; + } + + /** + * getSupervisorsMap. + * @param supervisors supervisors + * @param config config + * @return getSupervisorsMap + */ + private static List<Map> getSupervisorsMap(List<SupervisorSummary> supervisors, Map<String, Object> config) { + List<Map> supervisorMaps = new ArrayList(); + for (SupervisorSummary supervisorSummary : supervisors) { + supervisorMaps.add(getPrettifiedSupervisorMap(supervisorSummary, config)); + } + return supervisorMaps; + } + + /** + * addLogviewerInfo. + * @param config config + * @param result result + */ + private static void addLogviewerInfo(Map<String, Object> config, Map<String, Object> result) { + result.put("logviewerPort", getLogviewerPort(config)); + String logviewerScheme = "http"; + if (isSecureLogviewer(config)) { + logviewerScheme = "https"; + } + result.put("logviewerScheme", logviewerScheme); + } + + /** + * getSupervisorPageInfo. + * @param supervisorPageInfo supervisorPageInfo + * @param config config + * @return getSupervisorPageInfo + */ + public static Map<String, Object> getSupervisorPageInfo( + SupervisorPageInfo supervisorPageInfo, Map<String,Object> config) { + Map<String, Object> result = new HashMap(); + result.put("workers", getWorkerSummaries(supervisorPageInfo, config)); + result.put("schedulerDisplayResource", config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE)); + List<Map> supervisorMaps = getSupervisorsMap(supervisorPageInfo.get_supervisor_summaries(), config); + result.put("supervisors", supervisorMaps); + addLogviewerInfo(config, result); + return result; + } + + /** + * getAllTopologiesSummary. + * @param topologies topologies + * @param config config + * @return getAllTopologiesSummary + */ + public static Map<String, Object> getAllTopologiesSummary( + List<TopologySummary> topologies, Map<String,Object> config) { + Map<String, Object> result = new HashMap(); + result.put("topologies", getTopologiesMap(null, topologies)); + result.put("schedulerDisplayResource", config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE)); + return result; + } + + /** + * getWindowHint. + * @param window window + * @return getWindowHint + */ + public static String getWindowHint(String window) { + if (window.equals(":all-time")) { + return "All time"; + } + return UIHelpers.prettyUptimeSec(window); + } + + /** + * getStatDisplayMap. + * @param rawDisplayMap rawDisplayMap + * @return getStatDisplayMap + */ + public static Map<String, Double> getStatDisplayMap(Map<String, Double> rawDisplayMap) { + Map<String, Double> result = new HashMap(); + for (Map.Entry<String, Double> entry : rawDisplayMap.entrySet()) { + result.put(getWindowHint(entry.getKey()), entry.getValue()); + } + + return result; + } + + /** + * getTopologySummary. + * @param topologyPageInfo topologyPageInfo + * @param window window + * @param config config + * @param remoteUser remoteUser + * @return getTopologySummary + */ + public static Map<String, Object> getTopologySummary(TopologyPageInfo topologyPageInfo, + String window, Map<String, Object> config, String remoteUser) { + Map<String, Object> result = new HashMap(); + Map<String, Object> topologyConf = (Map<String, Object>) JSONValue.parse(topologyPageInfo.get_topology_conf()); + long messageTimeout = (long) topologyConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS); + Map<String, Object> unpackedTopologyPageInfo = + unpackTopologyInfo(topologyPageInfo, window, config); + result.putAll(unpackedTopologyPageInfo); + result.put("user", remoteUser); + result.put("window", window); + result.put("windowHint", getWindowHint(window)); + result.put("msgTimeout", messageTimeout); + result.put("configuration", topologyConf); + result.put("visualizationTable", new ArrayList()); + result.put("schedulerDisplayResource", config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE)); + return result; + } + + private static Map<String, Long> getStatDisplayMapLong(Map<String,Long> windowToTransferred) { + Map<String, Long> result = new HashMap(); + for (Map.Entry<String, Long> entry : windowToTransferred.entrySet()) { + result.put(entry.getKey(), entry.getValue()); + } + return result; + } + + private static Map<String, Object> getCommonAggStatsMap(CommonAggregateStats commonAggregateStats) { + Map<String, Object> result = new HashMap(); + result.put("executors", commonAggregateStats.get_num_executors()); + result.put("tasks", commonAggregateStats.get_num_tasks()); + result.put("emitted", commonAggregateStats.get_emitted()); + result.put("transferred", commonAggregateStats.get_transferred()); + result.put("acked", commonAggregateStats.get_acked()); + result.put("failed", commonAggregateStats.get_failed()); + result.put( + "requestedMemOnHeap", + commonAggregateStats.get_resources_map().get(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) + ); + result.put( + "requestedMemOffHeap", + commonAggregateStats.get_resources_map().get(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME)); + result.put( + "requestedCpu" , + commonAggregateStats.get_resources_map().get(Constants.COMMON_CPU_RESOURCE_NAME)); + return result; + } + + private static Map<String, Object> getSpoutAggStatsMap( + ComponentAggregateStats componentAggregateStats, String spoutId) { + Map<String, Object> result = new HashMap(); + result.putAll(getCommonAggStatsMap(componentAggregateStats.get_common_stats())); + result.put("spoutId", spoutId); + result.put("encodedSpoutId", URLEncoder.encode(spoutId)); + result.put("completeLatency", + componentAggregateStats.get_specific_stats().get_spout().get_complete_latency_ms()); + return result; + } + + private static Map<String, Object> getBoltAggStatsMap( + ComponentAggregateStats componentAggregateStats, String boltId) { + Map<String, Object> result = new HashMap(); + result.putAll(getCommonAggStatsMap(componentAggregateStats.get_common_stats())); + result.put("boltId", boltId); + result.put("encodedBoltId", URLEncoder.encode(boltId)); + BoltAggregateStats boltAggregateStats = componentAggregateStats.get_specific_stats().get_bolt(); + result.put("capacity", StatsUtil.floatStr(boltAggregateStats.get_capacity())); + result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms())); + result.put("executed", boltAggregateStats.get_executed()); + result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms())); + return result; + } + + private static List<Map> getTopologyStatsMap(TopologyStats topologyStats) { + List<Map> result = new ArrayList(); + + Map<String, Long> emittedStatDisplayMap = getStatDisplayMapLong(topologyStats.get_window_to_emitted()); + Map<String, Long> transferred = getStatDisplayMapLong(topologyStats.get_window_to_transferred()); + Map<String, Double> completeLatency = getStatDisplayMap(topologyStats.get_window_to_complete_latencies_ms()); + Map<String, Long> acked = getStatDisplayMapLong(topologyStats.get_window_to_acked()); + Map<String, Long> failed = getStatDisplayMapLong(topologyStats.get_window_to_failed()); + for (String window : emittedStatDisplayMap.keySet()) { + Map<String, Object> temp = new HashMap(); + temp.put("windowPretty", getWindowHint(window)); + temp.put("window", window); + temp.put("emitted", emittedStatDisplayMap.get(window)); + temp.put("transferred", transferred.get(window)); + temp.put("completeLatency", completeLatency.get(window)); + temp.put("acked", acked.get(window)); + temp.put("failed", failed.get(window)); + + + result.add(temp); + } + return result; + } + + private static Map<String,Object> unpackTopologyInfo(TopologyPageInfo topologyPageInfo, String window, Map<String,Object> config) { + Map<String, Object> result = new HashMap(); + result.put("id", topologyPageInfo.get_id()); + result.put("encodedId", URLEncoder.encode(topologyPageInfo.get_id())); + result.put("owner", topologyPageInfo.get_owner()); + result.put("name", topologyPageInfo.get_name()); + result.put("status", topologyPageInfo.get_status()); + result.put("uptime", UIHelpers.prettyUptimeSec(topologyPageInfo.get_uptime_secs())); + result.put("uptimeSeconds", topologyPageInfo.get_uptime_secs()); + result.put("tasksTotal", topologyPageInfo.get_num_tasks()); + result.put("workersTotal", topologyPageInfo.get_num_workers()); + result.put("executorsTotal", topologyPageInfo.get_num_executors()); + result.put("schedulerInfo", topologyPageInfo.get_sched_status()); + result.put("requestedMemOnHeap", topologyPageInfo.get_requested_memonheap()); + result.put("requestedMemOffHeap", topologyPageInfo.get_requested_memoffheap()); + result.put("requestedCpu", topologyPageInfo.get_requested_cpu()); + result.put("requestedTotalMem", + topologyPageInfo.get_requested_memonheap() + topologyPageInfo.get_requested_memoffheap() + ); + result.put("assignedMemOnHeap", topologyPageInfo.get_assigned_memonheap()); + result.put("assignedMemOffHeap", topologyPageInfo.get_assigned_memoffheap()); + result.put("assignedTotalMem", topologyPageInfo.get_assigned_memonheap() + + topologyPageInfo.get_assigned_memoffheap()); + result.put("assignedCpu", topologyPageInfo.get_assigned_cpu()); + result.put("requestedRegularOnHeapMem", topologyPageInfo.get_requested_regular_on_heap_memory()); + result.put("requestedSharedOnHeapMem", topologyPageInfo.get_requested_shared_on_heap_memory()); + result.put("requestedRegularOffHeapMem", topologyPageInfo.get_requested_regular_off_heap_memory()); + result.put("requestedSharedOffHeapMem", topologyPageInfo.get_requested_shared_off_heap_memory()); + result.put("assignedRegularOnHeapMem", topologyPageInfo.get_assigned_regular_on_heap_memory()); + result.put("assignedSharedOnHeapMem", topologyPageInfo.get_assigned_shared_on_heap_memory()); + result.put("assignedRegularOffHeapMem", topologyPageInfo.get_assigned_regular_off_heap_memory()); + result.put("assignedSharedOffHeapMem", topologyPageInfo.get_assigned_shared_off_heap_memory()); + result.put("topologyStats", getTopologyStatsMap(topologyPageInfo.get_topology_stats())); + List<Map> workerSummaries = new ArrayList(); + if (topologyPageInfo.is_set_workers()) { + for (WorkerSummary workerSummary : topologyPageInfo.get_workers()) { + workerSummaries.add(getWorkerSummaryMap(workerSummary, config)); + } + } + result.put("workers", workerSummaries); + + Map<String, ComponentAggregateStats> spouts = topologyPageInfo.get_id_to_spout_agg_stats(); + List<Map> spoutStats = new ArrayList(); + + for (Map.Entry<String, ComponentAggregateStats> spoutEntry : spouts.entrySet()) { + spoutStats.add(getSpoutAggStatsMap(spoutEntry.getValue(), spoutEntry.getKey())); + } + result.put("spouts", spoutStats); + + Map<String, ComponentAggregateStats> bolts = topologyPageInfo.get_id_to_bolt_agg_stats(); + List<Map> boltStats = new ArrayList(); + + for (Map.Entry<String, ComponentAggregateStats> boltEntry : bolts.entrySet()) { + boltStats.add(getBoltAggStatsMap(boltEntry.getValue(), boltEntry.getKey())); + } + result.put("bolts", boltStats); + + + result.put("configuration", topologyPageInfo.get_topology_conf()); + boolean debuggingEnabled = false; + if (topologyPageInfo.is_set_debug_options()) { + debuggingEnabled = topologyPageInfo.get_debug_options().is_enable(); + } + result.put("debug", debuggingEnabled); + double samplingPct = 10; + if (debuggingEnabled) { + samplingPct = topologyPageInfo.get_debug_options().get_samplingpct(); + } + result.put("samplingPct", samplingPct); + result.put("replicationCount", topologyPageInfo.get_replication_count()); + result.put("topologyVersion", topologyPageInfo.get_topology_version()); + result.put("stormVersion", topologyPageInfo.get_storm_version()); + return result; + } + + /** + * getTopologyWorkers. + * @param topologyInfo topologyInfo + * @param config config + * @return getTopologyWorkers. + */ + public static Map<String, Object> getTopologyWorkers(TopologyInfo topologyInfo, Map config) { + List<Map> executorSummaries = new ArrayList(); + for (ExecutorSummary executorSummary : topologyInfo.get_executors()) { + Map<String, Object> executorSummaryMap = new HashMap(); + executorSummaryMap.put("host", executorSummary.get_host()); + executorSummaryMap.put("port", executorSummary.get_port()); + executorSummaries.add(executorSummaryMap); + } + HashSet hashSet = new HashSet(); + hashSet.addAll(executorSummaries); + executorSummaries.clear(); + executorSummaries.addAll(hashSet); + Map<String, Object> result = new HashMap(); + result.put("hostPortList", executorSummaries); + addLogviewerInfo(config, result); + return result; + } + + + /** + * getTopologyLag. + * @param userTopology userTopology + * @param config config + * @return getTopologyLag. + */ + public static Map<String, Map<String, Object>> getTopologyLag(StormTopology userTopology, Map<String,Object> config) { + return TopologySpoutLag.lag(userTopology, config); + } + + /** + * getBoltExecutors. + * @param executorSummaries executorSummaries + * @param stormTopology stormTopology + * @param sys sys + * @return getBoltExecutors. + */ + public static Map<String, List<ExecutorSummary>> getBoltExecutors(List<ExecutorSummary> executorSummaries, + StormTopology stormTopology, boolean sys) { + Map<String, List<ExecutorSummary>> result = new HashMap(); + for (ExecutorSummary executorSummary : executorSummaries) { + if (StatsUtil.componentType(stormTopology, executorSummary.get_component_id()).equals("bolt") + && (sys || !Utils.isSystemId(executorSummary.get_component_id()))) { + List<ExecutorSummary> executorSummaryList = result.getOrDefault(executorSummary.get_component_id(), new ArrayList()); + executorSummaryList.add(executorSummary); + result.put(executorSummary.get_component_id(), executorSummaryList); + } + } + return result; + } + + /** + * getSpoutExecutors. + * @param executorSummaries executorSummaries + * @param stormTopology stormTopology + * @return getSpoutExecutors. + */ + public static Map<String, List<ExecutorSummary>> getSpoutExecutors(List<ExecutorSummary> executorSummaries, + StormTopology stormTopology) { + Map<String, List<ExecutorSummary>> result = new HashMap(); + for (ExecutorSummary executorSummary : executorSummaries) { + if (StatsUtil.componentType(stormTopology, executorSummary.get_component_id()).equals("spout")) { + List<ExecutorSummary> executorSummaryList = result.getOrDefault(executorSummary.get_component_id(), new ArrayList()); + executorSummaryList.add(executorSummary); + result.put(executorSummary.get_component_id(), executorSummaryList); + } + } + return result; + } + + /** + * sanitizeStreamName. + * @param streamName streamName + * @return sanitizeStreamName + */ + public static String sanitizeStreamName(String streamName) { + Pattern pattern = Pattern.compile("(?![A-Za-z_\\-:\\.])."); + Pattern pattern2 = Pattern.compile("^[A-Za-z]"); + Matcher matcher = pattern2.matcher(streamName); + Matcher matcher2 = pattern.matcher("\\s" + streamName); + if (matcher.find()) { + matcher2 = pattern.matcher(streamName); + } + return matcher2.replaceAll("_"); + } + + /** + * sanitizeTransferredStats. + * @param stats stats + * @return sanitizeTransferredStats + */ + public static Map<String, Map<String,Long>> sanitizeTransferredStats(Map<String, Map<String,Long>> stats) { + Map<String, Map<String,Long>> result = new HashMap(); + for (Map.Entry<String, Map<String,Long>> entry : stats.entrySet()) { + Map<String,Long> temp = new HashMap(); + for (Map.Entry<String,Long> innerEntry : entry.getValue().entrySet()) { + temp.put(sanitizeStreamName(innerEntry.getKey()), innerEntry.getValue()); + } + result.put(entry.getKey(), temp); + } + return result; + } + + /** + * getStatMapFromExecutorSummary. + * @param executorSummary executorSummary + * @return getStatMapFromExecutorSummary + */ + public static Map<String, Object> getStatMapFromExecutorSummary(ExecutorSummary executorSummary) { + Map<String, Object> result = new HashMap(); + result.put("host", executorSummary.get_host()); + result.put("port", executorSummary.get_port()); + result.put("uptime_secs", executorSummary.get_uptime_secs()); + result.put("transferred", null); + if (executorSummary.is_set_stats()) { + result.put("transferred", sanitizeTransferredStats(executorSummary.get_stats().get_transferred())); + } + return result; + } + + /** + * getInputMap. + * @param entryInput entryInput + * @return getInputMap + */ + public static Map<String, Object> getInputMap(Map.Entry<GlobalStreamId,Grouping> entryInput) { + Map<String, Object> result = new HashMap(); + result.put("component", entryInput.getKey().get_componentId()); + result.put("stream", entryInput.getKey().get_streamId()); + result.put("sani-stream", sanitizeStreamName(entryInput.getKey().get_streamId())); + result.put("grouping", entryInput.getValue()); + return result; + } + + /** + * getVisualizationData. + * @param client client + * @param window window + * @param topoId topoId + * @param sys sys + * @return getVisualizationData + * @throws TException TException + */ + public static Map<String, Object> getVisualizationData( + Nimbus.Iface client, String window, String topoId, boolean sys) throws TException { + GetInfoOptions getInfoOptions = new GetInfoOptions(); + getInfoOptions.set_num_err_choice(NumErrorsChoice.ONE); + TopologyInfo topologyInfo = client.getTopologyInfoWithOpts(topoId, getInfoOptions); + StormTopology stormTopology = client.getTopology(topoId); + Map<String, List<ExecutorSummary>> boltSummaries = getBoltExecutors(topologyInfo.get_executors(), stormTopology, sys); + Map<String, List<ExecutorSummary>> spoutSummaries = getSpoutExecutors(topologyInfo.get_executors(), stormTopology); + + Map<String, SpoutSpec> spoutSpecs = stormTopology.get_spouts(); + Map<String, Bolt> boltSpecs = stormTopology.get_bolts(); + + Map<String, Object> result = new HashMap(); + + for (Map.Entry<String, SpoutSpec> spoutSpecMapEntry : spoutSpecs.entrySet()) { + String spoutComponentId = spoutSpecMapEntry.getKey(); + if (spoutSummaries.containsKey(spoutComponentId)) { + Map<String, Object> spoutData = new HashMap(); + spoutData.put("type", "spout"); + spoutData.put("capacity", 0); + Map<String, Map> spoutStreamsStats = StatsUtil.spoutStreamsStats(spoutSummaries.get(spoutComponentId), true); + spoutData.put("latency", spoutStreamsStats.get("complete-latencies").get(window)); + spoutData.put("transferred", spoutStreamsStats.get("transferred").get(window)); + spoutData.put("stats", spoutSummaries.get( + spoutComponentId).stream().map( + UIHelpers::getStatMapFromExecutorSummary).collect(Collectors.toList())); + spoutData.put("link", UIHelpers.urlFormat("/component.html?id=%s&topology_id=%s", spoutComponentId, topoId)); + + result.put("inputs", + spoutSpecMapEntry.getValue().get_common().get_inputs().entrySet().stream().map( + UIHelpers::getInputMap).collect(Collectors.toList()) + ); + result.put(spoutComponentId, spoutData); + } + } + + for (Map.Entry<String, Bolt> boltEntry : boltSpecs.entrySet()) { + String boltComponentId = boltEntry.getKey(); + if (boltSummaries.containsKey(boltComponentId) && (sys || Utils.isSystemId(boltComponentId))) { + Map<String, Object> boltMap = new HashMap(); + boltMap.put("type", "bolt"); + boltMap.put("capacity", StatsUtil.computeBoltCapacity(boltSummaries.get(boltComponentId))); + Map<String, Map> boltStreamsStats = StatsUtil.boltStreamsStats(boltSummaries.get(boltComponentId), true); + boltMap.put("latency", boltStreamsStats.get("process-latencies").get(window)); + boltMap.put("transferred", boltStreamsStats.get("transferred").get(window)); + boltMap.put("stats", boltSummaries.get( + boltComponentId).stream().map( + UIHelpers::getStatMapFromExecutorSummary).collect(Collectors.toList())); + boltMap.put("link", UIHelpers.urlFormat("/component.html?id=%s&topology_id=%s", boltComponentId, topoId)); + + result.put("inputs", + boltEntry.getValue().get_common().get_inputs().entrySet().stream().map( + UIHelpers::getInputMap).collect(Collectors.toList()) + ); + result.put(boltComponentId, boltMap); + } + } + return result; + } + + /** + * getStreamBox. + * @param visualization visualization + * @return getStreamBox + */ + public static Map<String, Object> getStreamBox(Object visualization) { + Map<String, Object> visualizationData = (Map<String, Object>) visualization; + Map<String, Object> result = new HashMap(); + Map<String, Object> temp = (Map<String, Object>) visualizationData.get("inputs"); + result.put("stream", temp.get("stream")); + result.put("sani-stream", temp.get("sani-stream")); + result.put("checked", !Utils.isSystemId((String) temp.get("stream"))); + return result; + } + + /** + * getBuildVisualization. + * @param client client + * @param config config + * @param window window + * @param id id + * @param sys sys + * @ret --- End diff -- Not totally sure what happened here but the visualization does not work because the groupings are not being mapped to JSON correctly. ``` {"inputs":[{"component":"__acker","sani-stream":"_s__ack_reset_timeout","stream":"__ack_reset_timeout","grouping":<Grouping direct:NullStruct()>},{"component":"__acker","sani-stream":"_s__ack_ack","stream":"__ack_ack","grouping":<Grouping direct:NullStruct()>},{"component":"__acker","sani-stream":"_s__ack_fail","stream":"__ack_fail","grouping":<Grouping direct:NullStruct()>}],"spout":{"transferred":2800,"stats":[{"transferred":{"10800":{"default":2800},"600":{"default":2800},"86400":{"default":2800},":all-time":{"default":2800}},"port":6700,"host":"10.0.1.12","uptime_secs":61},{"transferred":{"10800":{"default":560},"600":{"default":560},"86400":{"default":560},":all-time":{"default":560}},"port":6701,"host":"10.0.1.12","uptime_secs":60},{"transferred":{"10800":{"default":560},"600":{"default":560},"86400":{"default":560},":all-time":{"default":560}},"port":6702,"host":"10.0.1.12","uptime_secs":61},{"transferred":{"10800":{"default":560},"600":{"default":560},"86400":{"default":5 60},":all-time":{"default":560}},"port":6700,"host":"10.0.1.12","uptime_secs":61},{"transferred":{"10800":{"default":560},"600":{"default":560},"86400":{"default":560},":all-time":{"default":560}},"port":6702,"host":"10.0.1.12","uptime_secs":61}],"latency":null,"link":"\/component.html?id=spout&topology_id=wc-1-1533304535","type":"spout","capacity":0}} ``` `<Grouping direct:NullStruct()>` is not what we want as output.
---