Every second ovs-dpctl dump-flow content is aggregated and presented in decreasing over by packet count. Several flow concepts are extracted and aggregated individually: - MAC address pairs - IPv4 and IPv6 address pairs - eth types - ip protocol - destination port - in port
Signed-off-by: Mark Hamilton <[email protected]> --- utilities/ovs-dpctl-top.in | 564 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 564 insertions(+) create mode 100755 utilities/ovs-dpctl-top.in diff --git a/utilities/ovs-dpctl-top.in b/utilities/ovs-dpctl-top.in new file mode 100755 index 0000000..0ef332c --- /dev/null +++ b/utilities/ovs-dpctl-top.in @@ -0,0 +1,564 @@ +#! @PYTHON@ +# +# Copyright (c) 2013 Nicira, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Top like behavior for ovs-dpctl dump-flows output. + +This program summarizes ovs-dpctl flow content by aggregrating the number +of packets and occurances of key flow concepts. These concepts are: + - the source and destination MAC address + - the source and destination IPv4 address + - the source and destination IPv6 address + - ethernet types + - IP protocol + - destination port and in port + +Output shows three values: + - FLOW - the flow concept + - PACKETS - the total number of packets containing the flow concept + - COUNT - the number of lines +in the dump-flow output contain the flow concept. + +Expected usage: + +$ ovs-dpctl-top + +or to run in a script: +$ ovs-dpctl dump-flows > dump-flows.log +$ ovs-dpctl-top.py --script --flow-file dump-flows.log + +""" + +# pylint: disable-msg=C0103 + +import sys +import os +import argparse +import logging +import re +import unittest +import copy +import curses +import time +import operator +import subprocess +import netaddr +import fcntl +import struct +import termios + +MAC_ADDR = "([\da-fA-F]{2}[:]?){6}" +IPV4_ADDR = "([\d]{1,3}[\.]?){4}" +IPV6_ADDR = "([\da-fA-F]{1,4}:){7}([\da-fA-F]{1,4})" + + +def element_eth_get(element_type, element, flow_line_as_dict): + """ Extract eth frame src and dst from a dump-flow element.""" + fmt = "%s(src=%s,dst=%s)" + + element = fmt % (element_type, element["src"], element["dst"]) + return SumData(element_type, element, flow_line_as_dict["packets"]) + + +def element_ipv4_get(element_type, element, flow_line_as_dict): + """ Extract src and dst from a dump-flow element.""" + fmt = "%s(src=%s,dst=%s)" + element_show = fmt % (element_type, element["src"], element["dst"]) + element_key = fmt % (element_type, + netaddr.IPNetwork(element["src"]).ipv4().network, + netaddr.IPNetwork(element["dst"]).ipv4().network) + + return SumMaskData(element_type, element_show, + flow_line_as_dict["packets"], element_key) + + +def element_ipv6_get(element_type, element, flow_line_as_dict): + """ Extract src and dst from a dump-flow element.""" + + fmt = "%s(src=%s,dst=%s)" + element_show = fmt % (element_type, element["src"], element["dst"]) + + src = str(netaddr.IPNetwork(element["src"]).ipv6().network) + dst = str(netaddr.IPNetwork(element["dst"]).ipv6().network) + element_key = fmt % (element_type, src, dst) + + return SumMaskData(element_type, element_show, + flow_line_as_dict["packets"], element_key) + + +def element_dst_port_get(element_type, element, flow_line_as_dict): + """ Extract src and dst from a dump-flow element.""" + return SumData(element_type, + "%s(dst=%s)" % (element_type, element["dst"]), + flow_line_as_dict["packets"]) + + +def element_passthrough_get(element_type, element, flow_line_as_dict): + """ Extract src and dst from a dump-flow element.""" + return SumData(element_type, "%s(%s)" % (element_type, element), + flow_line_as_dict["packets"]) + + +# pylint: disable-msg=R0903 +class OutputFormat: + """ Holds element_type and function to extract element value. """ + def __init__(self, element_type, generator): + self.element_type = element_type + self.generator = generator + +OUTPUT_FORMAT = [ + OutputFormat("eth", element_eth_get), + OutputFormat("ipv4", element_ipv4_get), + OutputFormat("ipv6", element_ipv6_get), + OutputFormat("udp", element_dst_port_get), + OutputFormat("tcp", element_dst_port_get), + OutputFormat("eth_type", element_passthrough_get), + OutputFormat("in_port", element_passthrough_get) + ] + + +ELEMENT_KEY = { + "udp": "udp.dst", + "tcp": "tcp.dst" + } + + +def input_get(args): + """ Determine where to read flow content based on program parameters. """ + + if (args.top or args.flowFile == "pipe"): + # args.flowFile == "pipe" is used for testing. + return subprocess.Popen(["ovs-dpctl", "dump-flows"], + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE).stdout + else: + # else --script was passed + if (args.flowFile is None): + return os.fdopen(sys.stdin.fileno(), 'r', 0) + else: + return open(args.flowFile, "r") + + +def args_get(): + """ read program parameters handle any necessary validation of input. """ + + parser = argparse.ArgumentParser(version="@VERSION@", + formatter_class=argparse.RawDescriptionHelpFormatter, + description=__doc__) + ## + # None is a special value indicating to read flows from stdin. + # This handles the case + # ovs-dpctl dump-flows | ovs-dpctl-flows.py + parser.add_argument("-f", "--flow-file", dest="flowFile", default=None, + help="file containing flows frmo ovs-dpctl dump-flow") + parser.add_argument("-V", "--verbose", dest="verbose", + default=logging.INFO, + action="store_const", const=logging.DEBUG, + help="enable debug level verbosity") + + parser.add_argument("-s", "--script", dest="top", action="store_false", + help="Run from a script (no suser interface)") + + args = parser.parse_args() + + logging.basicConfig(level=args.verbose) + + return args + +### +# Code to parse a single line in dump-flow +### +# key(values) +FLOW_CMPND = re.compile("([\w]+)\((.+)\)") +# key:value +FLOW_CMPND_ELEMENT = re.compile("([\w:]+)=([/\.\w:]+)") +FLOW_ELEMENT = re.compile("([\w]+):([-\.\w]+)") + + +def flow_line_iter(line): + """ iterate over flow dump elements. """ + # splits by , except for when in a (). Actions element was not + # split properly but we don't need it. + return re.split(",(?![^(]*\\))", line) + + +def flow_line_compound_parse(compound): + """ Parse compound element + for example + src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03 + which is in + eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03) + """ + result = {} + for element in flow_line_iter(compound): + match = FLOW_CMPND_ELEMENT.search(element) + if (match): + key = match.group(1) + value = match.group(2) + result[key] = value + + match = FLOW_CMPND.search(element) + if (match): + key = match.group(1) + value = match.group(2) + result[key] = flow_line_compound_parse(value) + continue + + if (len(result.keys()) == 0): + return compound + return result + + +def flow_line_to_dict(line): + """ Convert line to a hierarchy of dictionaries. """ + result = {} + # ignore anything after actions. + index = line.find("actions:") + if (index > -1): + line = line[:index].rstrip(", ") + + for element in flow_line_iter(line): + match = FLOW_CMPND.search(element) + if (match): + key = match.group(1) + value = match.group(2) + result[key] = flow_line_compound_parse(value) + continue + + match = FLOW_ELEMENT.search(element) + if (match): + key = match.group(1) + value = match.group(2) + result[key] = value + else: + raise ValueError("can't parse %s in %s", element, line) + return result + + +# pylint: disable-msg=R0903 +class SumData: + """ Interface that all data going into SumDb must implement. + __repr__ is used as key into SumData singleton. + __str__ is used as human readable output. + """ + + def __init__(self, obj_type, element, packets): + # Count is the number of lines in the dump-flow log. + self.type = obj_type + self.element = element + self.count = 1 + self.packets = int(packets) + + def __iadd__(self, other): + """ Add two objects. """ + # a sanity check to make sure we don't mix data. + if ((self.__class__ != other.__class__) and + not isinstance(other, SumData)): + raise ValueError("adding two unrelated types") + + self.count += other.count + self.packets += other.packets + return self + + def __str__(self): + return "%s %s %s %s" % (self.type, self.element, self.count, + self.packets) + + def flow_get(self): + """ Used as key in the SumDB table. """ + return self.element + + def __repr__(self): + """ Used as key in the SumDB table. """ + return self.element + + +class SumMaskData(SumData): + """ Holds summary of data that is paired by src and dst. """ + def __init__(self, obj_type, element, packets, key): + """ Expects a valid src and destination mac address. """ + SumData.__init__(self, obj_type, element, packets) + self.key = key + + def __repr__(self): + """ Used as key in the SumDB table. """ + return self.key + + +class SumDB: + """ A singleton used to manage summary information. """ + def __init__(self): + self._dict = {} + # A tuple (line number, line value) + self.error_lines = [] + self.flow_count = 0 + + def types_get(self): + """ Return the set of types stored in the singleton. """ + types = set((ii.type for ii in self._dict.values())) + return types + + def add(self, data): + """ Collect dump-flow data to sum number of times item appears. """ + current = self._dict.get(repr(data), None) + if (current is None): + current = copy.copy(data) + else: + current += data + self._dict[repr(current)] = current + self.flow_count += 1 + + def values_in_order(self, order_type): + """ Return a list of items in order maximum first. """ + if (order_type): + values = [ii for ii in self._dict.values() + if (ii.type == order_type)] + else: + values = self._dict.values() + values = [(ii.packets, ii.count, ii) for ii in values] + values.sort(key=operator.itemgetter(0, 1)) + values.reverse() + values = [ii[2] for ii in values] + return values + + def error_line_add(self, line_count, line_value): + """ Store lines that did not match any parsing. """ + self.error_lines.append((line_count, line_value)) + + def stats_get(self): + """ Return statistics in a form of a dictionary. """ + return {"flow_total": self.flow_count, + "flow_errors": len(self.error_lines)} + + +def from_line(line): + """ Search for content in a line. """ + result = [] + flow_dict = flow_line_to_dict(line) + + for output_format in OUTPUT_FORMAT: + element_exists = flow_dict.get(output_format.element_type) + if (element_exists): + element = output_format.generator(output_format.element_type, + element_exists, flow_dict) + result.append(element) + + return result + + +def flows_process(args): + """ iterate over flow input to generate summary and final output. """ + done = False + sum_db = SumDB() + line_count = 0 + + with input_get(args) as ihdl: + while (not done): + line = ihdl.readline() + if (len(line) == 0): + # end of input + break + + logging.debug("flow %s", line) + matches = from_line(line) + for match in matches: + sum_db.add(match) + + if (len(matches) == 0): + sum_db.error_line_add(line_count, line) + line_count += 1 + return sum_db + + +def get_terminal_size(): + """ + return column width and height of the terminal + """ + for fd_io in [0, 1, 2]: + try: + result = struct.unpack('hh', + fcntl.ioctl(fd_io, + termios.TIOCGWINSZ, '1234')) + except IOError: + result = None + continue + + if (result is None): + result = (25, 80) + + return result + + +def render_flow(value, flow_width): + """ truncate really long flow and insert ellipses to help make it clear. + """ + ellipses = " ... " + if (len(value) > flow_width): + value = value[:-(flow_width - len(ellipses))] + ellipses + return value + + +# pylint: disable-msg=R0913 +def render(flow, packets, count, console_width, title=False): + """ Extract what is necessary to print out summary. + Column total needs to be below 80 characters. + """ + packet_width = 7 + count_width = 7 + spaces = 3 + + maximum_flow_width = console_width - packet_width - count_width - spaces + if (maximum_flow_width < 20): + # just give up. console is too small + maximum_flow_width = 20 + + fmt = "%(flow)s %(packets)s %(count)s" + if (title): + return fmt % {"flow": flow.center(maximum_flow_width), + "packets": str(packets).center(packet_width), + "count": str(count).center(count_width)} + else: + flow = render_flow(flow, maximum_flow_width).ljust(maximum_flow_width) + return fmt % {"flow": flow, + "packets": str(packets).rjust(packet_width), + "count": str(count).rjust(count_width)} + + +class CursesScreen: + """ support with clause. """ + def __init__(self): + self.stdscr = None + + def __enter__(self): + self.stdscr = curses.initscr() + curses.cbreak() + curses.noecho() + self.stdscr.keypad(1) + return self.stdscr + + def __exit__(self, ignored1, ignored2, ignored3): + curses.nocbreak() + self.stdscr.keypad(0) + curses.echo() + curses.endwin() + + +def flows_script_show(sum_db, console_width): + """ shows flows based on --script parameter.""" + lines = [] + + lines.append("Flow summary".center(console_width)) + lines.append(" Total: %(flow_total)s errors: %(flow_errors)s" % + (sum_db.stats_get())) + + lines.append(render("FLOW", "PACKETS", "COUNT", console_width, True)) + # Change to this loop, if you want output ordered by type + #for flow_type in flow_types: + #for value in sum_db.values_in_order(flow_type): + #print render(value.flow_get(), value.packets, value.count) + for value in sum_db.values_in_order(None): + lines.append(render(value.flow_get(), value.packets, value.count, + console_width)) + + return lines + + +def flows_top(args): + """ handles top like behavior. """ + + lines = [] + with CursesScreen() as stdscr: + key = 'X' + stdscr.nodelay(1) + while (key != ord('q')): + sum_db = flows_process(args) + (console_height, console_width) = stdscr.getmaxyx() + count = 0 + lines = flows_script_show(sum_db, console_width) + for line in lines: + stdscr.addstr(count, 0, line) + count += 1 + if (count >= console_height): + break + stdscr.refresh() + time.sleep(1) + key = stdscr.getch() + + # repeat output + for line in lines[:console_height]: + print line + + +def flows_script(args): + """ handles --script option. """ + + sum_db = flows_process(args) + (_, console_width) = get_terminal_size() + for line in flows_script_show(sum_db, console_width): + print line + + +def main(): + """ Return 0 on sucess or 1 on failure. """ + args = args_get() + + try: + if (args.top): + flows_top(args) + else: + flows_script(args) + except KeyboardInterrupt: + return 1 + return 0 + +if __name__ == '__main__': + sys.exit(main()) + + +## +# Test case beyond this point. +## +# pylint: disable-msg=R0904 +class TestsuiteFlowParse(unittest.TestCase): + """ + parse flow into hierarchy of dictionaries. + """ + def test_flow_parse(self): + """ test_flow_parse. """ + line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\ + "dst=33:33:00:01:00:03),eth_type(0x86dd),"\ + "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\ + "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\ + "udp(src=61252,dst=5355), packets:1, bytes:92, "\ + "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\ + "38,41,44,47,50,53,56,59,62,65" + + flow_dict = flow_line_to_dict(line) + self.assertEqual(flow_dict["eth"]["src"], "00:50:56:b4:4e:f8") + self.assertEqual(flow_dict["eth"]["dst"], "33:33:00:01:00:03") + self.assertEqual(flow_dict["ipv6"]["src"], "fe80::55bf:fe42:bc96:2812") + self.assertEqual(flow_dict["ipv6"]["dst"], "ff02::1:3") + + line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\ + "dst=33:33:00:01:00:03),eth_type(0x86dd),"\ + "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\ + "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\ + "udp(src=61252,dst=5355), packets:1, bytes:92, "\ + "used:-0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\ + "38,41,44,47,50,53,56,59,62,65" + + flow_dict = flow_line_to_dict(line) + self.assertEqual(flow_dict["used"], "-0.703s") -- 1.7.10.4 _______________________________________________ dev mailing list [email protected] http://openvswitch.org/mailman/listinfo/dev
