Repository: storm Updated Branches: refs/heads/master a1e6e98c6 -> f5a410ba3
STORM-3217: Fixing getComponentPage API call Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a805c70b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a805c70b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a805c70b Branch: refs/heads/master Commit: a805c70b210f47cb4a5192a6692888637a81f654 Parents: 8f9061a Author: Govind Menon <[email protected]> Authored: Wed Sep 12 15:17:43 2018 -0500 Committer: Govind Menon <[email protected]> Committed: Wed Sep 12 16:57:08 2018 -0500 ---------------------------------------------------------------------- .../org/apache/storm/daemon/ui/UIHelpers.java | 380 ++++++++++++++++++- 1 file changed, 361 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a805c70b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java ---------------------------------------------------------------------- diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java index a080599..c280cf2 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java @@ -25,12 +25,14 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.net.URLEncoder; import java.util.ArrayList; +import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -49,6 +51,9 @@ import org.apache.storm.generated.ClusterSummary; import org.apache.storm.generated.CommonAggregateStats; import org.apache.storm.generated.ComponentAggregateStats; import org.apache.storm.generated.ComponentPageInfo; +import org.apache.storm.generated.ComponentType; +import org.apache.storm.generated.ErrorInfo; +import org.apache.storm.generated.ExecutorAggregateStats; import org.apache.storm.generated.ExecutorInfo; import org.apache.storm.generated.ExecutorSummary; import org.apache.storm.generated.GetInfoOptions; @@ -66,6 +71,8 @@ import org.apache.storm.generated.OwnerResourceSummary; import org.apache.storm.generated.ProfileAction; import org.apache.storm.generated.ProfileRequest; import org.apache.storm.generated.RebalanceOptions; +import org.apache.storm.generated.SpecificAggregateStats; +import org.apache.storm.generated.SpoutAggregateStats; import org.apache.storm.generated.SpoutSpec; import org.apache.storm.generated.StormTopology; import org.apache.storm.generated.SupervisorPageInfo; @@ -975,7 +982,8 @@ public class UIHelpers { * @param config config * @return getSupervisorsMap */ - private static List<Map> getSupervisorsMap(List<SupervisorSummary> supervisors, Map<String, Object> config) { + private static List<Map> getSupervisorsMap(List<SupervisorSummary> supervisors, + Map<String, Object> config) { List<Map> supervisorMaps = new ArrayList(); for (SupervisorSummary supervisorSummary : supervisors) { supervisorMaps.add(getPrettifiedSupervisorMap(supervisorSummary, config)); @@ -1080,6 +1088,11 @@ public class UIHelpers { return result; } + /** + * getStatDisplayMapLong. + * @param windowToTransferred windowToTransferred + * @return getStatDisplayMapLong + */ private static Map<String, Long> getStatDisplayMapLong(Map<String,Long> windowToTransferred) { Map<String, Long> result = new HashMap(); for (Map.Entry<String, Long> entry : windowToTransferred.entrySet()) { @@ -1088,6 +1101,11 @@ public class UIHelpers { return result; } + /** + * getCommonAggStatsMap. + * @param commonAggregateStats commonAggregateStats + * @return getCommonAggStatsMap + */ private static Map<String, Object> getCommonAggStatsMap(CommonAggregateStats commonAggregateStats) { Map<String, Object> result = new HashMap(); result.put("executors", commonAggregateStats.get_num_executors()); @@ -1096,34 +1114,272 @@ public class UIHelpers { result.put("transferred", commonAggregateStats.get_transferred()); result.put("acked", commonAggregateStats.get_acked()); result.put("failed", commonAggregateStats.get_failed()); + if (commonAggregateStats.is_set_resources_map()) { + result.put( + "requestedMemOnHeap", + commonAggregateStats.get_resources_map().get(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) + ); + result.put( + "requestedMemOffHeap", + commonAggregateStats.get_resources_map().get(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME)); + result.put( + "requestedCpu", + commonAggregateStats.get_resources_map().get(Constants.COMMON_CPU_RESOURCE_NAME)); + } + return result; + } + + /** + * getTruncatedErrorString. + * @param errorString errorString + * @return getTruncatedErrorString + */ + private static String getTruncatedErrorString(String errorString) { + return errorString.substring(0, Math.min(errorString.length(), 200)); + } + + /** + * getSpoutAggStatsMap. + * @param componentAggregateStats componentAggregateStats + * @param window window + * @return getSpoutAggStatsMap + */ + private static Map<String, Object> getSpoutAggStatsMap( + ComponentAggregateStats componentAggregateStats, String window) { + Map<String, Object> result = new HashMap(); + SpoutAggregateStats spoutAggregateStats = componentAggregateStats.get_specific_stats().get_spout(); + CommonAggregateStats commonStats = componentAggregateStats.get_common_stats(); + result.put("window", window); + result.put("windowPretty", getWindowHint(window)); + result.put("emitted", commonStats.get_emitted()); + result.put("transferred", commonStats.get_transferred()); + result.put("acked", commonStats.get_acked()); + result.put("failed", commonStats.get_failed()); + result.put("completeLatency", spoutAggregateStats.get_complete_latency_ms()); + + + ErrorInfo lastError = componentAggregateStats.get_last_error(); + result.put("lastError", Objects.isNull(lastError) ? "" : getTruncatedErrorString(lastError.get_error())); + return result; + } + + /** + * getBoltAggStatsMap. + * @param componentAggregateStats componentAggregateStats + * @param window window + * @return getBoltAggStatsMap + */ + private static Map<String, Object> getBoltAggStatsMap( + ComponentAggregateStats componentAggregateStats, String window) { + Map<String, Object> result = new HashMap(); + CommonAggregateStats commonStats = componentAggregateStats.get_common_stats(); + result.put("window", window); + result.put("windowPretty", getWindowHint(window)); + result.put("emitted", commonStats.get_emitted()); + result.put("transferred", commonStats.get_transferred()); + result.put("acked", commonStats.get_acked()); + result.put("failed", commonStats.get_failed()); + BoltAggregateStats boltAggregateStats = componentAggregateStats.get_specific_stats().get_bolt(); + result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms())); + result.put("executed", boltAggregateStats.get_executed()); + result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms())); + result.put("capacity", StatsUtil.floatStr(boltAggregateStats.get_capacity())); + return result; + } + + /** + * nullToZero. + * @param value value + * @return nullToZero + */ + private static Long nullToZero(Long value) { + return Objects.isNull(value) ? value : 0; + } + + /** + * nullToZero. + * @param value value + * @return nullToZero + */ + private static Double nullToZero(Double value) { + return Objects.isNull(value) ? value : 0; + } + + /** + * getBoltInputStats. + * @param globalStreamId globalStreamId + * @param componentAggregateStats componentAggregateStats + * @return getBoltInputStats + */ + private static Map<String, Object> getBoltInputStats(GlobalStreamId globalStreamId, + ComponentAggregateStats componentAggregateStats) { + Map<String, Object> result = new HashMap(); + SpecificAggregateStats specificAggregateStats = componentAggregateStats.get_specific_stats(); + BoltAggregateStats boltAggregateStats = specificAggregateStats.get_bolt(); + CommonAggregateStats commonAggregateStats = componentAggregateStats.get_common_stats(); + String componentId = globalStreamId.get_componentId(); + result.put("component", componentId); + result.put("encodedComponentId", URLEncoder.encode(componentId)); + result.put("stream", globalStreamId.get_streamId()); + result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms())); + result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms())); + result.put("executed", nullToZero(boltAggregateStats.get_executed())); + result.put("acked", nullToZero(commonAggregateStats.get_acked())); + result.put("failed", nullToZero(commonAggregateStats.get_failed())); + return result; + } + + /** + * getBoltOutputStats. + * @param streamId streamId + * @param componentAggregateStats componentAggregateStats + * @return getBoltOutputStats + */ + private static Map<String, Object> getBoltOutputStats(String streamId, + ComponentAggregateStats componentAggregateStats) { + Map<String, Object> result = new HashMap(); + result.put("stream", streamId); + CommonAggregateStats commonStats = componentAggregateStats.get_common_stats(); + result.put("emitted", nullToZero(commonStats.get_emitted())); + result.put("transferred", nullToZero(commonStats.get_transferred())); + return result; + } + + /** + * getSpoutOutputStats. + * @param streamId streamId + * @param componentAggregateStats componentAggregateStats + * @return getSpoutOutputStats + */ + private static Map<String, Object> getSpoutOutputStats(String streamId, + ComponentAggregateStats componentAggregateStats) { + SpecificAggregateStats specificAggregateStats = componentAggregateStats.get_specific_stats(); + SpoutAggregateStats spoutAggregateStats = specificAggregateStats.get_spout(); + Map<String, Object> result = new HashMap(); + result.put("stream", streamId); + CommonAggregateStats commonStats = componentAggregateStats.get_common_stats(); + result.put("emitted", nullToZero(commonStats.get_emitted())); + result.put("transferred", nullToZero(commonStats.get_transferred())); + result.put("completeLatency", StatsUtil.floatStr(spoutAggregateStats.get_complete_latency_ms())); + result.put("acked", nullToZero(commonStats.get_acked())); + result.put("failed", nullToZero(commonStats.get_failed())); + return result; + } + + private static Map<String, Object> getBoltExecutorStats(String topologyId, Map<String, Object> config, + ExecutorAggregateStats executorAggregateStats) { + Map<String, Object> result = new HashMap(); + ExecutorSummary executorSummary = executorAggregateStats.get_exec_summary(); + ExecutorInfo executorInfo = executorSummary.get_executor_info(); + ComponentAggregateStats componentAggregateStats = executorAggregateStats.get_stats(); + SpecificAggregateStats specificAggregateStats = componentAggregateStats.get_specific_stats(); + BoltAggregateStats boltAggregateStats = specificAggregateStats.get_bolt(); + CommonAggregateStats commonAggregateStats = componentAggregateStats.get_common_stats(); + String executorId = prettyExecutorInfo(executorInfo); + result.put("id", executorId); + result.put("encodedId", URLEncoder.encode(executorId)); + result.put("uptime", prettyUptimeSec(executorSummary.get_uptime_secs())); + result.put("uptimeSeconds", executorSummary.get_uptime_secs()); + String host = executorSummary.get_host(); + result.put("host", host); + int port = executorSummary.get_port(); + result.put("port", port); + result.put("emitted", nullToZero(commonAggregateStats.get_emitted())); + result.put("transferred", nullToZero(commonAggregateStats.get_transferred())); + result.put("capacity", StatsUtil.floatStr(nullToZero(boltAggregateStats.get_capacity()))); + result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms())); + result.put("executed", nullToZero(boltAggregateStats.get_executed())); + result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms())); + result.put("acked", nullToZero(commonAggregateStats.get_acked())); + result.put("failed", nullToZero(commonAggregateStats.get_failed())); + result.put("workerLogLink", getWorkerLogLink(host, port, config, topologyId)); + return result; + } + + private static Map<String, Object> getSpoutExecutorStats(String topologyId, Map<String, Object> config, + ExecutorAggregateStats executorAggregateStats) { + Map<String, Object> result = new HashMap(); + ExecutorSummary executorSummary = executorAggregateStats.get_exec_summary(); + ExecutorInfo executorInfo = executorSummary.get_executor_info(); + ComponentAggregateStats componentAggregateStats = executorAggregateStats.get_stats(); + SpecificAggregateStats specificAggregateStats = componentAggregateStats.get_specific_stats(); + SpoutAggregateStats spoutAggregateStats = specificAggregateStats.get_spout(); + CommonAggregateStats commonAggregateStats = componentAggregateStats.get_common_stats(); + String executorId = prettyExecutorInfo(executorInfo); + result.put("id", executorId); + result.put("encodedId", URLEncoder.encode(executorId)); + result.put("uptime", prettyUptimeSec(executorSummary.get_uptime_secs())); + result.put("uptimeSeconds", executorSummary.get_uptime_secs()); + String host = executorSummary.get_host(); + result.put("host", host); + int port = executorSummary.get_port(); + result.put("port", port); + result.put("emitted", nullToZero(commonAggregateStats.get_emitted())); + result.put("transferred", nullToZero(commonAggregateStats.get_transferred())); + result.put("completeLatency", StatsUtil.floatStr(spoutAggregateStats.get_complete_latency_ms())); + result.put("acked", nullToZero(commonAggregateStats.get_acked())); + result.put("failed", nullToZero(commonAggregateStats.get_failed())); + result.put("workerLogLink", getWorkerLogLink(host, port, config, topologyId)); + return result; + } + + private static Map<String, Object> getComponentErrorInfo(ErrorInfo errorInfo, Map config, + String topologyId) { + Map<String, Object> result = new HashMap(); + result.put("time", 1000 * (long) errorInfo.get_error_time_secs()); + String host = errorInfo.get_host(); + result.put("errorHost", host); + int port = errorInfo.get_port(); + result.put("errorPort", port); + result.put("errorWorkerLogLink", getWorkerLogLink(host, port, config, topologyId)); + result.put("errorLapsedSecs", System.currentTimeMillis() / 1000 - errorInfo.get_error_time_secs()); + result.put("error", errorInfo.get_error()); + return result; + } + + private static Map<String, Object> getComponentErrors(List<ErrorInfo> errorInfoList, + String topologyId, Map config) { + Map<String, Object> result = new HashMap(); + errorInfoList.sort(Comparator.comparingInt(ErrorInfo::get_error_time_secs)); result.put( - "requestedMemOnHeap", - commonAggregateStats.get_resources_map().get(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) - ); - result.put( - "requestedMemOffHeap", - commonAggregateStats.get_resources_map().get(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME)); + "componentErrors", + errorInfoList.stream().map( + e -> getComponentErrorInfo(e, config, topologyId) + ).collect(Collectors.toList())); + return result; + } + + private static Map<String, Object> getTopologyErrors(List<ErrorInfo> errorInfoList, + String topologyId, Map config) { + Map<String, Object> result = new HashMap(); + errorInfoList.sort(Comparator.comparingInt(ErrorInfo::get_error_time_secs)); result.put( - "requestedCpu" , - commonAggregateStats.get_resources_map().get(Constants.COMMON_CPU_RESOURCE_NAME)); + "topologyErrors", + errorInfoList.stream().map( + e -> getComponentErrorInfo(e, config, topologyId) + ).collect(Collectors.toList())); return result; } - private static Map<String, Object> getSpoutAggStatsMap( - ComponentAggregateStats componentAggregateStats, String spoutId) { + private static Map<String, Object> getTopologySpoutAggStatsMap(ComponentAggregateStats componentAggregateStats, + String spoutId) { Map<String, Object> result = new HashMap(); - result.putAll(getCommonAggStatsMap(componentAggregateStats.get_common_stats())); + CommonAggregateStats commonStats = componentAggregateStats.get_common_stats(); + result.putAll(getCommonAggStatsMap(commonStats)); result.put("spoutId", spoutId); result.put("encodedSpoutId", URLEncoder.encode(spoutId)); - result.put("completeLatency", - componentAggregateStats.get_specific_stats().get_spout().get_complete_latency_ms()); + SpoutAggregateStats spoutAggregateStats = componentAggregateStats.get_specific_stats().get_spout(); + result.put("completeLatency", spoutAggregateStats.get_complete_latency_ms()); + ErrorInfo lastError = componentAggregateStats.get_last_error(); + result.put("lastError", Objects.isNull(lastError) ? "" : getTruncatedErrorString(lastError.get_error())); return result; } - private static Map<String, Object> getBoltAggStatsMap( - ComponentAggregateStats componentAggregateStats, String boltId) { + private static Map<String, Object> getTopologyBoltAggStatsMap(ComponentAggregateStats componentAggregateStats, + String boltId) { Map<String, Object> result = new HashMap(); - result.putAll(getCommonAggStatsMap(componentAggregateStats.get_common_stats())); + CommonAggregateStats commonStats = componentAggregateStats.get_common_stats(); + result.putAll(getCommonAggStatsMap(commonStats)); result.put("boltId", boltId); result.put("encodedBoltId", URLEncoder.encode(boltId)); BoltAggregateStats boltAggregateStats = componentAggregateStats.get_specific_stats().get_bolt(); @@ -1131,6 +1387,8 @@ public class UIHelpers { result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms())); result.put("executed", boltAggregateStats.get_executed()); result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms())); + ErrorInfo lastError = componentAggregateStats.get_last_error(); + result.put("lastError", Objects.isNull(lastError) ? "" : getTruncatedErrorString(lastError.get_error())); return result; } @@ -1203,7 +1461,7 @@ public class UIHelpers { List<Map> spoutStats = new ArrayList(); for (Map.Entry<String, ComponentAggregateStats> spoutEntry : spouts.entrySet()) { - spoutStats.add(getSpoutAggStatsMap(spoutEntry.getValue(), spoutEntry.getKey())); + spoutStats.add(getTopologySpoutAggStatsMap(spoutEntry.getValue(), spoutEntry.getKey())); } result.put("spouts", spoutStats); @@ -1211,7 +1469,7 @@ public class UIHelpers { List<Map> boltStats = new ArrayList(); for (Map.Entry<String, ComponentAggregateStats> boltEntry : bolts.entrySet()) { - boltStats.add(getBoltAggStatsMap(boltEntry.getValue(), boltEntry.getKey())); + boltStats.add(getTopologyBoltAggStatsMap(boltEntry.getValue(), boltEntry.getKey())); } result.put("bolts", boltStats); @@ -1541,6 +1799,83 @@ public class UIHelpers { } /** + * unpackBoltPageInfo. + * @param componentPageInfo componentPageInfo + * @param topologyId topologyId + * @param window window + * @param sys sys + * @param config config + * @return unpackBoltPageInfo + */ + public static Map<String, Object> unpackBoltPageInfo(ComponentPageInfo componentPageInfo, + String topologyId, String window, boolean sys, + Map config) { + Map<String, Object> result = new HashMap(); + + result.put( + "boltStats", + componentPageInfo.get_window_to_stats().entrySet().stream().map( + e -> getBoltAggStatsMap(e.getValue(), e.getKey()) + ).collect(Collectors.toList()) + ); + result.put( + "inputStats", + componentPageInfo.get_gsid_to_input_stats().entrySet().stream().map( + e -> getBoltInputStats(e.getKey(), e.getValue()) + ).collect(Collectors.toList()) + ); + result.put( + "outputStats", + componentPageInfo.get_sid_to_output_stats().entrySet().stream().map( + e -> getBoltOutputStats(e.getKey(), e.getValue()) + ).collect(Collectors.toList()) + ); + result.put( + "executorStats", + componentPageInfo.get_exec_stats().stream().map( + e -> getBoltExecutorStats(topologyId, config, e) + ).collect(Collectors.toList()) + ); + result.putAll(getComponentErrors(componentPageInfo.get_errors(), topologyId, config)); + return result; + } + + /** + * unpackSpoutPageInfo. + * @param componentPageInfo componentPageInfo + * @param topologyId topologyId + * @param window window + * @param sys sys + * @param config config + * @return unpackSpoutPageInfo + */ + public static Map<String, Object> unpackSpoutPageInfo(ComponentPageInfo componentPageInfo, + String topologyId, String window, boolean sys, + Map config) { + Map<String, Object> result = new HashMap(); + result.put( + "spoutSummary", + componentPageInfo.get_window_to_stats().entrySet().stream().map( + e -> getSpoutAggStatsMap(e.getValue(), e.getKey()) + ).collect(Collectors.toList()) + ); + result.put( + "outputStats", + componentPageInfo.get_sid_to_output_stats().entrySet().stream().map( + e -> getSpoutOutputStats(e.getKey(), e.getValue()) + ).collect(Collectors.toList()) + ); + result.put( + "executorStats", + componentPageInfo.get_exec_stats().stream().map( + e -> getSpoutExecutorStats(topologyId, config, e) + ).collect(Collectors.toList()) + ); + result.putAll(getComponentErrors(componentPageInfo.get_errors(), topologyId, config)); + return result; + } + + /** * getComponentPage. * @param client client * @param id id @@ -1559,6 +1894,13 @@ public class UIHelpers { ComponentPageInfo componentPageInfo = client.getComponentPageInfo( id, component, window, sys ); + + if (componentPageInfo.get_component_type().equals(ComponentType.BOLT)) { + result.putAll(unpackBoltPageInfo(componentPageInfo, id, window, sys, config)); + } else if ((componentPageInfo.get_component_type().equals(ComponentType.SPOUT))) { + result.putAll(unpackSpoutPageInfo(componentPageInfo, id, window, sys, config)); + } + result.put("user", user); result.put("id" , component); result.put("encodedId", URLEncoder.encode(component));
