Every second ovs-dpctl dump-flow content is aggregated and presented in 
decreasing over by packet count. Several flow concepts are extracted and 
aggregated individually:
  - MAC address pairs
  - IPv4 and IPv6 address pairs
  - eth types
  - ip protocol
  - destination port
  - in port

Signed-off-by: Mark Hamilton <[email protected]>
---
 utilities/ovs-dpctl-top.in |  564 ++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 564 insertions(+)
 create mode 100755 utilities/ovs-dpctl-top.in

diff --git a/utilities/ovs-dpctl-top.in b/utilities/ovs-dpctl-top.in
new file mode 100755
index 0000000..0ef332c
--- /dev/null
+++ b/utilities/ovs-dpctl-top.in
@@ -0,0 +1,564 @@
+#! @PYTHON@
+#
+# Copyright (c) 2013 Nicira, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""Top like behavior for ovs-dpctl dump-flows output.
+
+This program summarizes ovs-dpctl flow content by aggregrating the number
+of packets and occurances of key flow concepts. These concepts are:
+ - the source and destination MAC address
+ - the source and destination IPv4 address
+ - the source and destination IPv6 address
+ - ethernet types
+ - IP protocol
+ - destination port and in port
+
+Output shows three values:
+ - FLOW - the flow concept
+ - PACKETS - the total number of packets containing the flow concept
+ - COUNT - the number of lines
+in the dump-flow output contain the flow concept.
+
+Expected usage:
+
+$ ovs-dpctl-top
+
+or to run in a script:
+$ ovs-dpctl dump-flows > dump-flows.log
+$ ovs-dpctl-top.py --script --flow-file dump-flows.log
+
+"""
+
+# pylint: disable-msg=C0103
+
+import sys
+import os
+import argparse
+import logging
+import re
+import unittest
+import copy
+import curses
+import time
+import operator
+import subprocess
+import netaddr
+import fcntl
+import struct
+import termios
+
+MAC_ADDR = "([\da-fA-F]{2}[:]?){6}"
+IPV4_ADDR = "([\d]{1,3}[\.]?){4}"
+IPV6_ADDR = "([\da-fA-F]{1,4}:){7}([\da-fA-F]{1,4})"
+
+
+def element_eth_get(element_type, element, flow_line_as_dict):
+    """ Extract eth frame src and dst from a dump-flow element."""
+    fmt = "%s(src=%s,dst=%s)"
+
+    element = fmt % (element_type, element["src"], element["dst"])
+    return SumData(element_type, element, flow_line_as_dict["packets"])
+
+
+def element_ipv4_get(element_type, element, flow_line_as_dict):
+    """ Extract src and dst from a dump-flow element."""
+    fmt = "%s(src=%s,dst=%s)"
+    element_show = fmt % (element_type, element["src"], element["dst"])
+    element_key = fmt % (element_type,
+                 netaddr.IPNetwork(element["src"]).ipv4().network,
+                 netaddr.IPNetwork(element["dst"]).ipv4().network)
+
+    return SumMaskData(element_type, element_show,
+                       flow_line_as_dict["packets"], element_key)
+
+
+def element_ipv6_get(element_type, element, flow_line_as_dict):
+    """ Extract src and dst from a dump-flow element."""
+
+    fmt = "%s(src=%s,dst=%s)"
+    element_show = fmt % (element_type, element["src"], element["dst"])
+
+    src = str(netaddr.IPNetwork(element["src"]).ipv6().network)
+    dst = str(netaddr.IPNetwork(element["dst"]).ipv6().network)
+    element_key = fmt % (element_type, src, dst)
+
+    return SumMaskData(element_type, element_show,
+                       flow_line_as_dict["packets"], element_key)
+
+
+def element_dst_port_get(element_type, element, flow_line_as_dict):
+    """ Extract src and dst from a dump-flow element."""
+    return SumData(element_type,
+                   "%s(dst=%s)" % (element_type, element["dst"]),
+                   flow_line_as_dict["packets"])
+
+
+def element_passthrough_get(element_type, element, flow_line_as_dict):
+    """ Extract src and dst from a dump-flow element."""
+    return SumData(element_type, "%s(%s)" % (element_type, element),
+                   flow_line_as_dict["packets"])
+
+
+# pylint: disable-msg=R0903
+class OutputFormat:
+    """ Holds element_type and function to extract element value. """
+    def __init__(self, element_type, generator):
+        self.element_type = element_type
+        self.generator = generator
+
+OUTPUT_FORMAT = [
+    OutputFormat("eth", element_eth_get),
+    OutputFormat("ipv4", element_ipv4_get),
+    OutputFormat("ipv6", element_ipv6_get),
+    OutputFormat("udp", element_dst_port_get),
+    OutputFormat("tcp", element_dst_port_get),
+    OutputFormat("eth_type", element_passthrough_get),
+    OutputFormat("in_port", element_passthrough_get)
+    ]
+
+
+ELEMENT_KEY = {
+    "udp": "udp.dst",
+    "tcp": "tcp.dst"
+    }
+
+
+def input_get(args):
+    """ Determine where to read flow content based on program parameters. """
+
+    if (args.top or args.flowFile == "pipe"):
+        # args.flowFile == "pipe" is used for testing.
+        return subprocess.Popen(["ovs-dpctl", "dump-flows"],
+                                stderr=subprocess.STDOUT,
+                                stdout=subprocess.PIPE).stdout
+    else:
+        # else --script was passed
+        if (args.flowFile is None):
+            return os.fdopen(sys.stdin.fileno(), 'r', 0)
+        else:
+            return open(args.flowFile, "r")
+
+
+def args_get():
+    """ read program parameters handle any necessary validation of input. """
+
+    parser = argparse.ArgumentParser(version="@VERSION@",
+                          formatter_class=argparse.RawDescriptionHelpFormatter,
+                          description=__doc__)
+    ##
+    # None is a special value indicating to read flows from stdin.
+    # This handles the case
+    #   ovs-dpctl dump-flows | ovs-dpctl-flows.py
+    parser.add_argument("-f", "--flow-file", dest="flowFile", default=None,
+                     help="file containing flows frmo ovs-dpctl dump-flow")
+    parser.add_argument("-V", "--verbose", dest="verbose",
+                      default=logging.INFO,
+                      action="store_const", const=logging.DEBUG,
+                      help="enable debug level verbosity")
+
+    parser.add_argument("-s", "--script", dest="top", action="store_false",
+                      help="Run from a script (no suser interface)")
+
+    args = parser.parse_args()
+
+    logging.basicConfig(level=args.verbose)
+
+    return args
+
+###
+# Code to parse a single line in dump-flow
+###
+# key(values)
+FLOW_CMPND = re.compile("([\w]+)\((.+)\)")
+# key:value
+FLOW_CMPND_ELEMENT = re.compile("([\w:]+)=([/\.\w:]+)")
+FLOW_ELEMENT = re.compile("([\w]+):([-\.\w]+)")
+
+
+def flow_line_iter(line):
+    """ iterate over flow dump elements. """
+    # splits by , except for when in a (). Actions element was not
+    # split properly but we don't need it.
+    return re.split(",(?![^(]*\\))", line)
+
+
+def flow_line_compound_parse(compound):
+    """ Parse compound element
+    for example
+    src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03
+    which is in
+    eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)
+    """
+    result = {}
+    for element in flow_line_iter(compound):
+        match = FLOW_CMPND_ELEMENT.search(element)
+        if (match):
+            key = match.group(1)
+            value = match.group(2)
+            result[key] = value
+
+        match = FLOW_CMPND.search(element)
+        if (match):
+            key = match.group(1)
+            value = match.group(2)
+            result[key] = flow_line_compound_parse(value)
+            continue
+
+    if (len(result.keys()) == 0):
+        return compound
+    return result
+
+
+def flow_line_to_dict(line):
+    """ Convert line to a hierarchy of dictionaries. """
+    result = {}
+    # ignore anything after actions.
+    index = line.find("actions:")
+    if (index > -1):
+        line = line[:index].rstrip(", ")
+
+    for element in flow_line_iter(line):
+        match = FLOW_CMPND.search(element)
+        if (match):
+            key = match.group(1)
+            value = match.group(2)
+            result[key] = flow_line_compound_parse(value)
+            continue
+
+        match = FLOW_ELEMENT.search(element)
+        if (match):
+            key = match.group(1)
+            value = match.group(2)
+            result[key] = value
+        else:
+            raise ValueError("can't parse %s in %s", element, line)
+    return result
+
+
+# pylint: disable-msg=R0903
+class SumData:
+    """ Interface that all data going into SumDb must implement.
+    __repr__ is used as key into SumData singleton.
+    __str__ is used as human readable output.
+    """
+
+    def __init__(self, obj_type, element, packets):
+        # Count is the number of lines in the dump-flow log.
+        self.type = obj_type
+        self.element = element
+        self.count = 1
+        self.packets = int(packets)
+
+    def __iadd__(self, other):
+        """ Add two objects. """
+        # a sanity check to make sure we don't mix data.
+        if ((self.__class__ != other.__class__) and
+            not isinstance(other, SumData)):
+            raise ValueError("adding two unrelated types")
+
+        self.count += other.count
+        self.packets += other.packets
+        return self
+
+    def __str__(self):
+        return "%s %s %s %s" % (self.type, self.element, self.count,
+                                self.packets)
+
+    def flow_get(self):
+        """ Used as key in the SumDB table. """
+        return self.element
+
+    def __repr__(self):
+        """ Used as key in the SumDB table. """
+        return self.element
+
+
+class SumMaskData(SumData):
+    """ Holds summary of data that is paired by src and dst. """
+    def __init__(self, obj_type, element, packets, key):
+        """ Expects a valid src and destination mac address. """
+        SumData.__init__(self, obj_type, element, packets)
+        self.key = key
+
+    def __repr__(self):
+        """ Used as key in the SumDB table. """
+        return self.key
+
+
+class SumDB:
+    """ A singleton used to manage summary information. """
+    def __init__(self):
+        self._dict = {}
+        # A tuple (line number, line value)
+        self.error_lines = []
+        self.flow_count = 0
+
+    def types_get(self):
+        """ Return the set of types stored in the singleton. """
+        types = set((ii.type for ii in self._dict.values()))
+        return types
+
+    def add(self, data):
+        """ Collect dump-flow data to sum number of times item appears. """
+        current = self._dict.get(repr(data), None)
+        if (current is None):
+            current = copy.copy(data)
+        else:
+            current += data
+        self._dict[repr(current)] = current
+        self.flow_count += 1
+
+    def values_in_order(self, order_type):
+        """ Return a list of items in order maximum first. """
+        if (order_type):
+            values = [ii for ii in self._dict.values()
+                      if (ii.type == order_type)]
+        else:
+            values = self._dict.values()
+        values = [(ii.packets, ii.count, ii) for ii in values]
+        values.sort(key=operator.itemgetter(0, 1))
+        values.reverse()
+        values = [ii[2] for ii in values]
+        return values
+
+    def error_line_add(self, line_count, line_value):
+        """ Store lines that did not match any parsing. """
+        self.error_lines.append((line_count, line_value))
+
+    def stats_get(self):
+        """ Return statistics in a form of a dictionary. """
+        return {"flow_total": self.flow_count,
+                "flow_errors": len(self.error_lines)}
+
+
+def from_line(line):
+    """ Search for content in a line. """
+    result = []
+    flow_dict = flow_line_to_dict(line)
+
+    for output_format in OUTPUT_FORMAT:
+        element_exists = flow_dict.get(output_format.element_type)
+        if (element_exists):
+            element = output_format.generator(output_format.element_type,
+                                              element_exists, flow_dict)
+            result.append(element)
+
+    return result
+
+
+def flows_process(args):
+    """ iterate over flow input to generate summary and final output. """
+    done = False
+    sum_db = SumDB()
+    line_count = 0
+
+    with input_get(args) as ihdl:
+        while (not done):
+            line = ihdl.readline()
+            if (len(line) == 0):
+                # end of input
+                break
+
+            logging.debug("flow %s", line)
+            matches = from_line(line)
+            for match in matches:
+                sum_db.add(match)
+
+            if (len(matches) == 0):
+                sum_db.error_line_add(line_count, line)
+            line_count += 1
+    return sum_db
+
+
+def get_terminal_size():
+    """
+    return column width and height of the terminal
+    """
+    for fd_io in [0, 1, 2]:
+        try:
+            result = struct.unpack('hh',
+                                   fcntl.ioctl(fd_io,
+                                               termios.TIOCGWINSZ, '1234'))
+        except IOError:
+            result = None
+            continue
+
+    if (result is None):
+        result = (25, 80)
+
+    return result
+
+
+def render_flow(value, flow_width):
+    """ truncate really long flow and insert ellipses to help make it clear.
+    """
+    ellipses = " ... "
+    if (len(value) > flow_width):
+        value = value[:-(flow_width - len(ellipses))] + ellipses
+    return value
+
+
+# pylint: disable-msg=R0913
+def render(flow, packets, count, console_width, title=False):
+    """ Extract what is necessary to print out summary.
+    Column total needs to be below 80 characters.
+    """
+    packet_width = 7
+    count_width = 7
+    spaces = 3
+
+    maximum_flow_width = console_width - packet_width - count_width - spaces
+    if (maximum_flow_width < 20):
+        # just give up. console is too small
+        maximum_flow_width = 20
+
+    fmt = "%(flow)s %(packets)s %(count)s"
+    if (title):
+        return fmt % {"flow": flow.center(maximum_flow_width),
+                      "packets": str(packets).center(packet_width),
+                      "count": str(count).center(count_width)}
+    else:
+        flow = render_flow(flow, maximum_flow_width).ljust(maximum_flow_width)
+        return fmt % {"flow": flow,
+                      "packets": str(packets).rjust(packet_width),
+                      "count": str(count).rjust(count_width)}
+
+
+class CursesScreen:
+    """ support with clause. """
+    def __init__(self):
+        self.stdscr = None
+
+    def __enter__(self):
+        self.stdscr = curses.initscr()
+        curses.cbreak()
+        curses.noecho()
+        self.stdscr.keypad(1)
+        return self.stdscr
+
+    def __exit__(self, ignored1, ignored2, ignored3):
+        curses.nocbreak()
+        self.stdscr.keypad(0)
+        curses.echo()
+        curses.endwin()
+
+
+def flows_script_show(sum_db, console_width):
+    """ shows flows based on --script parameter."""
+    lines = []
+
+    lines.append("Flow summary".center(console_width))
+    lines.append(" Total: %(flow_total)s  errors: %(flow_errors)s" %
+                 (sum_db.stats_get()))
+
+    lines.append(render("FLOW", "PACKETS", "COUNT", console_width, True))
+    # Change to this loop, if you want output ordered by type
+    #for flow_type in flow_types:
+        #for value in sum_db.values_in_order(flow_type):
+            #print render(value.flow_get(), value.packets, value.count)
+    for value in sum_db.values_in_order(None):
+        lines.append(render(value.flow_get(), value.packets, value.count,
+                     console_width))
+
+    return lines
+
+
+def flows_top(args):
+    """ handles top like behavior. """
+
+    lines = []
+    with CursesScreen() as stdscr:
+        key = 'X'
+        stdscr.nodelay(1)
+        while (key != ord('q')):
+            sum_db = flows_process(args)
+            (console_height, console_width) = stdscr.getmaxyx()
+            count = 0
+            lines = flows_script_show(sum_db, console_width)
+            for line in lines:
+                stdscr.addstr(count, 0, line)
+                count += 1
+                if (count >= console_height):
+                    break
+            stdscr.refresh()
+            time.sleep(1)
+            key = stdscr.getch()
+
+    # repeat output
+    for line in lines[:console_height]:
+        print line
+
+
+def flows_script(args):
+    """ handles --script option. """
+
+    sum_db = flows_process(args)
+    (_, console_width) = get_terminal_size()
+    for line in flows_script_show(sum_db, console_width):
+        print line
+
+
+def main():
+    """ Return 0 on sucess or 1 on failure. """
+    args = args_get()
+
+    try:
+        if (args.top):
+            flows_top(args)
+        else:
+            flows_script(args)
+    except KeyboardInterrupt:
+        return 1
+    return 0
+
+if __name__ == '__main__':
+    sys.exit(main())
+
+
+##
+# Test case beyond this point.
+##
+# pylint: disable-msg=R0904
+class TestsuiteFlowParse(unittest.TestCase):
+    """
+    parse flow into hierarchy of dictionaries.
+    """
+    def test_flow_parse(self):
+        """ test_flow_parse. """
+        line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
+               "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
+               "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
+               "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
+               "udp(src=61252,dst=5355), packets:1, bytes:92, "\
+               "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
+               "38,41,44,47,50,53,56,59,62,65"
+
+        flow_dict = flow_line_to_dict(line)
+        self.assertEqual(flow_dict["eth"]["src"], "00:50:56:b4:4e:f8")
+        self.assertEqual(flow_dict["eth"]["dst"], "33:33:00:01:00:03")
+        self.assertEqual(flow_dict["ipv6"]["src"], "fe80::55bf:fe42:bc96:2812")
+        self.assertEqual(flow_dict["ipv6"]["dst"], "ff02::1:3")
+
+        line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
+               "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
+               "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
+               "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
+               "udp(src=61252,dst=5355), packets:1, bytes:92, "\
+               "used:-0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
+               "38,41,44,47,50,53,56,59,62,65"
+
+        flow_dict = flow_line_to_dict(line)
+        self.assertEqual(flow_dict["used"], "-0.703s")
-- 
1.7.10.4

_______________________________________________
dev mailing list
[email protected]
http://openvswitch.org/mailman/listinfo/dev

Reply via email to