[ 
https://issues.apache.org/jira/browse/STORM-312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14045236#comment-14045236
 ] 

ASF GitHub Bot commented on STORM-312:
--------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/117#discussion_r14268626
  
    --- Diff: storm-core/src/jvm/backtype/storm/command/Monitor.java ---
    @@ -0,0 +1,197 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.command;
    +
    +import backtype.storm.generated.*;
    +import backtype.storm.utils.NimbusClient;
    +import backtype.storm.utils.Utils;
    +import org.kohsuke.args4j.CmdLineException;
    +import org.kohsuke.args4j.CmdLineParser;
    +import org.kohsuke.args4j.Option;
    +
    +import java.util.Map;
    +
    +public class Monitor {
    +    @Option(name="--help", aliases={"-h"}, usage="print help message")
    +    private boolean _help = false;
    +
    +    @Option(name="--interval", aliases={"-i"}, usage="poll frequency in 
seconds")
    +    private int _interval = 4;
    +
    +    @Option(name="--name", aliases={"--topologyName"}, metaVar="NAME",
    +            usage="base name of the topology (numbers may be appended to 
the end)")
    +    private String _name;
    +
    +    @Option(name="--component", aliases={"--componentName"}, 
metaVar="NAME",
    +            usage="component name of the topology")
    +    private String _component;
    +
    +    @Option(name="--stat", aliases={"--statItem"}, metaVar="ITEM",
    +            usage="stat item [emitted | transferred]")
    +    private String _stat = "emitted";
    +
    +    private static class MetricsState {
    +        long lastStatted = 0;
    +        long lastTime = 0;
    +    }
    +
    +    public void metrics(Nimbus.Client client, int poll, String name, 
String component, String stat) throws Exception {
    +        
System.out.println("status\ttopologie\tslots\tcomponent\texecutors\texecutorsWithMetrics\ttime-diff
 ms\t" + stat + "\tthroughput (Kt/s)");
    +        MetricsState state = new MetricsState();
    +        long pollMs = poll * 1000;
    +        long now = System.currentTimeMillis();
    +        state.lastTime = now;
    +        long startTime = now;
    +        long cycle, sleepTime, wakeupTime;
    +
    +        while (metrics(client, name, component, stat, now, state, 
"WAITING")) {
    +            now = System.currentTimeMillis();
    +            cycle = (now - startTime)/pollMs;
    +            wakeupTime = startTime + (pollMs * (cycle + 1));
    +            sleepTime = wakeupTime - now;
    +            if (sleepTime > 0) {
    +                Thread.sleep(sleepTime);
    +            }
    +            now = System.currentTimeMillis();
    +        }
    +
    +        now = System.currentTimeMillis();
    +        cycle = (now - startTime)/pollMs;
    +        wakeupTime = startTime + (pollMs * (cycle + 1));
    +        sleepTime = wakeupTime - now;
    +        if (sleepTime > 0) {
    +            Thread.sleep(sleepTime);
    +        }
    +        now = System.currentTimeMillis();
    +        do {
    +            metrics(client, name, component, stat, now, state, "RUNNING");
    +            now = System.currentTimeMillis();
    +            cycle = (now - startTime)/pollMs;
    +            wakeupTime = startTime + (pollMs * (cycle + 1));
    +            sleepTime = wakeupTime - now;
    +            if (sleepTime > 0) {
    +                Thread.sleep(sleepTime);
    +            }
    +            now = System.currentTimeMillis();
    +        } while (true);
    +    }
    +
    +    public boolean metrics(Nimbus.Client client, String name, String 
component, String stat, long now, MetricsState state, String message) throws 
Exception {
    +        long totalStatted = 0;
    +
    +        boolean topologyFound = false;
    +        boolean componentFound = false;
    +        int slotsUsed = 0;
    +        int executors = 0;
    +        int executorsWithMetrics = 0;
    +        ClusterSummary summary = client.getClusterInfo();
    +        for (TopologySummary ts: summary.get_topologies()) {
    +            if (name.equals(ts.get_name())) {
    +                topologyFound = true;
    +                slotsUsed = ts.get_num_workers();
    +                String id = ts.get_id();
    +                TopologyInfo info = client.getTopologyInfo(id);
    +                for (ExecutorSummary es: info.get_executors()) {
    +                    if (component.equals(es.get_component_id())) {
    +                        componentFound = true;
    +                        executors ++;
    +                        ExecutorStats stats = es.get_stats();
    +                        if (stats != null) {
    +                            Map<String,Map<String,Long>> statted =
    +                                    "emitted".equals(stat) ? 
stats.get_emitted() : stats.get_transferred();
    +                            if ( statted != null) {
    +                                Map<String, Long> e2 = 
statted.get(":all-time");
    +                                if (e2 != null) {
    +                                    executorsWithMetrics++;
    +                                    //topology messages are always on the 
default stream, so just count those
    +                                    Long dflt = e2.get("default");
    --- End diff --
    
    This was only true for the simple test that we had set up.  We probably 
want some way to handle more then just default.


> 'storm monitor' to monitor throughput performance interactively
> ---------------------------------------------------------------
>
>                 Key: STORM-312
>                 URL: https://issues.apache.org/jira/browse/STORM-312
>             Project: Apache Storm (Incubating)
>          Issue Type: New Feature
>            Reporter: Jiahong Li
>              Labels: cmd-line-monitor
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> This cmd line tool 'storm monitor' will use Nimbus.Client to get throughput 
> information from Nimbus Server.
> 1)One can specify topology's name, component's name to monitor it's 
> throughput interactively.
> 2) It will statistics 'emit' and/or 'transferred' throughput in a given time 
> window and print it in a given time frequency
> The implementation will be much like yahoo's storm-perf-test 
> (http://yahooeng.tumblr.com/post/64758709722/making-storm-fly-with-netty) 



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to