This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 6dc889e0b TEZ-4666: Migrate tez-tools python scripts to python3 (#444)
(Raghav Aggarwal reviewed by Laszlo Bodor)
6dc889e0b is described below
commit 6dc889e0b69058fe78c3e194d115a42ecbe3495e
Author: Raghav Aggarwal <[email protected]>
AuthorDate: Fri Jan 16 19:35:28 2026 +0530
TEZ-4666: Migrate tez-tools python scripts to python3 (#444) (Raghav
Aggarwal reviewed by Laszlo Bodor)
---
tez-tools/counter-diff/README.md | 6 +-
tez-tools/counter-diff/counter-diff.py | 361 ++++++++++---------
tez-tools/swimlanes/amlogparser.py | 519 +++++++++++++++-------------
tez-tools/swimlanes/swimlane.py | 494 +++++++++++++++++---------
tez-tools/swimlanes/yarn-swimlanes.sh | 4 +-
tez-tools/tez-log-split/logsplit.py | 27 +-
tez-tools/tez-log-split/tez-log-splitter.sh | 2 +-
7 files changed, 839 insertions(+), 574 deletions(-)
diff --git a/tez-tools/counter-diff/README.md b/tez-tools/counter-diff/README.md
index a88a32f24..811c26b79 100644
--- a/tez-tools/counter-diff/README.md
+++ b/tez-tools/counter-diff/README.md
@@ -26,11 +26,11 @@ Before using the tool, make sure to install texttable using
`pip install texttab
To use the tool, run
-`python counter-diff.py dag_1.zip dag_2.zip [--detail]`
+`python3 counter-diff.py dag_1.zip dag_2.zip [--detail]`
This will print counter output difference between the specified DAGs like this
-Example: `python counter-diff.py dag_1499978510619_0002_143.zip
dag_1499978510619_0002_144.zip`
+Example: `python3 counter-diff.py dag_1499978510619_0002_143.zip
dag_1499978510619_0002_144.zip`
```
+-------------------+-------------------------------------+----------------------------+----------------------------+-------------+
@@ -81,4 +81,4 @@ Example: `python counter-diff.py
dag_1499978510619_0002_143.zip dag_149997851061
| | KILLED_TASKS | 0
| 0 | 0 |
| | TIME_TAKEN | 250198
| 68981 | -181217 |
+-------------------+-------------------------------------+----------------------------+----------------------------+-------------+
-```
\ No newline at end of file
+```
diff --git a/tez-tools/counter-diff/counter-diff.py
b/tez-tools/counter-diff/counter-diff.py
index d7d07220c..a7ef9fbd7 100644
--- a/tez-tools/counter-diff/counter-diff.py
+++ b/tez-tools/counter-diff/counter-diff.py
@@ -17,187 +17,222 @@
# under the License.
#
-import imp, json, os, shutil, sys, tempfile, zipfile
+import json
+import os
+import shutil
+import sys
+import tempfile
+import zipfile
+
try:
- imp.find_module('texttable')
from texttable import Texttable
except ImportError:
- sys.stderr.write("Could not import Texttable\nRetry after 'pip install
texttable'\n")
- exit()
+ print(
+ "Could not import Texttable. Retry after 'pip install texttable'",
+ file=sys.stderr,
+ )
+ sys.exit(1)
tmpdir = tempfile.mkdtemp()
+
def extract_zip(filename):
- file_dir = os.path.join(tmpdir, os.path.splitext(filename)[0])
- if not os.path.exists(file_dir):
- os.makedirs(file_dir)
+ file_dir = os.path.join(tmpdir, os.path.splitext(filename)[0])
+ if not os.path.exists(file_dir):
+ os.makedirs(file_dir)
- zip_ref = zipfile.ZipFile(os.path.abspath(filename), 'r')
- zip_ref.extractall(os.path.abspath(file_dir))
- zip_ref.close()
- return file_dir
+ zip_ref = zipfile.ZipFile(os.path.abspath(filename), "r")
+ zip_ref.extractall(os.path.abspath(file_dir))
+ zip_ref.close()
+ return file_dir
def diff(file1, file2):
- # extract ZIP files
- file1_dir = extract_zip(file1)
- file2_dir = extract_zip(file2)
-
- # tez debugtool writes json data to TEZ_DAG file whereas tez UI writes
to dag.json
- # also in dag.json data is inside "dag" root node
- file1_using_dag_json = True
- dag_json_file1 = os.path.join(file1_dir, "dag.json")
- if os.path.isfile(dag_json_file1) == False:
- file1_using_dag_json = False
- dag_json_file1 = os.path.join(file1_dir, "TEZ_DAG")
- if os.path.isfile(dag_json_file1) == False:
- print "Unable to find dag.json/TEZ_DAG file inside the
archive " + file1
- exit()
-
- file2_using_dag_json = True
- dag_json_file2 = os.path.join(file2_dir, "dag.json")
- if os.path.isfile(dag_json_file2) == False:
- file2_using_dag_json = False
- dag_json_file2 = os.path.join(file2_dir, "TEZ_DAG")
- if os.path.isfile(dag_json_file2) == False:
- print "Unable to find dag.json/TEZ_DAG file inside the
archive " + file1
- exit()
-
- # populate diff table
- difftable = {}
- with open(dag_json_file1) as data_file:
- file1_dag_json = json.load(data_file)["dag"] if
file1_using_dag_json else json.load(data_file)
- counters = file1_dag_json['otherinfo']['counters']
- for group in counters['counterGroups']:
- countertable = {}
- for counter in group['counters']:
- counterName = counter['counterName']
- countertable[counterName] = []
-
countertable[counterName].append(counter['counterValue'])
-
- groupName = group['counterGroupName']
- difftable[groupName] = countertable
-
- # add other info
- otherinfo = file1_dag_json['otherinfo']
- countertable = {}
- countertable["TIME_TAKEN"] = [otherinfo['timeTaken']]
- countertable["COMPLETED_TASKS"] =
[otherinfo['numCompletedTasks']]
- countertable["SUCCEEDED_TASKS"] =
[otherinfo['numSucceededTasks']]
- countertable["FAILED_TASKS"] = [otherinfo['numFailedTasks']]
- countertable["KILLED_TASKS"] = [otherinfo['numKilledTasks']]
- countertable["FAILED_TASK_ATTEMPTS"] =
[otherinfo['numFailedTaskAttempts']]
- countertable["KILLED_TASK_ATTEMPTS"] =
[otherinfo['numKilledTaskAttempts']]
- difftable['otherinfo'] = countertable
-
- with open(dag_json_file2) as data_file:
- file2_dag_json = json.load(data_file)["dag"] if
file2_using_dag_json else json.load(data_file)
- counters = file2_dag_json['otherinfo']['counters']
- for group in counters['counterGroups']:
- groupName = group['counterGroupName']
- if groupName not in difftable:
- difftable[groupName] = {}
- countertable = difftable[groupName]
- for counter in group['counters']:
- counterName = counter['counterName']
- # if counter does not exist in file1, add it
with 0 value
- if counterName not in countertable:
- countertable[counterName] = [0]
-
countertable[counterName].append(counter['counterValue'])
-
- # append other info
- otherinfo = file2_dag_json['otherinfo']
- countertable = difftable['otherinfo']
- countertable["TIME_TAKEN"].append(otherinfo['timeTaken'])
-
countertable["COMPLETED_TASKS"].append(otherinfo['numCompletedTasks'])
-
countertable["SUCCEEDED_TASKS"].append(otherinfo['numSucceededTasks'])
- countertable["FAILED_TASKS"].append(otherinfo['numFailedTasks'])
- countertable["KILLED_TASKS"].append(otherinfo['numKilledTasks'])
-
countertable["FAILED_TASK_ATTEMPTS"].append(otherinfo['numFailedTaskAttempts'])
-
countertable["KILLED_TASK_ATTEMPTS"].append(otherinfo['numKilledTaskAttempts'])
- difftable['otherinfo'] = countertable
-
- # if some counters are missing, consider it as 0 and compute delta
difference
- for k,v in difftable.items():
- for key, value in v.items():
- # if counter value does not exisit in file2, add it
with 0 value
- if len(value) == 1:
- value.append(0)
-
- # store delta difference
- delta = value[1] - value[0]
- value.append(("+" if delta > 0 else "") + str(delta))
-
- return difftable
+ # extract ZIP files
+ file1_dir = extract_zip(file1)
+ file2_dir = extract_zip(file2)
+
+ # tez debugtool writes json data to TEZ_DAG file whereas tez UI writes to
dag.json
+ # also in dag.json data is inside "dag" root node
+ file1_using_dag_json = True
+ dag_json_file1 = os.path.join(file1_dir, "dag.json")
+ if not os.path.isfile(dag_json_file1):
+ file1_using_dag_json = False
+ dag_json_file1 = os.path.join(file1_dir, "TEZ_DAG")
+ if not os.path.isfile(dag_json_file1):
+ print("Unable to find dag.json/TEZ_DAG file inside the archive " +
file1)
+ sys.exit()
+
+ file2_using_dag_json = True
+ dag_json_file2 = os.path.join(file2_dir, "dag.json")
+ if not os.path.isfile(dag_json_file2):
+ file2_using_dag_json = False
+ dag_json_file2 = os.path.join(file2_dir, "TEZ_DAG")
+ if not os.path.isfile(dag_json_file2):
+ print("Unable to find dag.json/TEZ_DAG file inside the archive " +
file1)
+ sys.exit()
+
+ # populate diff table
+ difftable = {}
+ with open(dag_json_file1) as data_file:
+ file1_dag_json = (
+ json.load(data_file)["dag"]
+ if file1_using_dag_json
+ else json.load(data_file)
+ )
+
+ # Safe access to otherinfo and counters
+ otherinfo = file1_dag_json.get("otherinfo", {})
+ counters = otherinfo.get("counters", {})
+
+ # Iterate only if counterGroups exists
+ for group in counters.get("counterGroups", []):
+ countertable = {}
+ for counter in group["counters"]:
+ counterName = counter["counterName"]
+ countertable[counterName] = []
+ countertable[counterName].append(counter["counterValue"])
+
+ groupName = group["counterGroupName"]
+ difftable[groupName] = countertable
+
+ # add other info safely
+ countertable = {}
+ countertable["TIME_TAKEN"] = [otherinfo.get("timeTaken", 0)]
+ countertable["COMPLETED_TASKS"] = [otherinfo.get("numCompletedTasks",
0)]
+ countertable["SUCCEEDED_TASKS"] = [otherinfo.get("numSucceededTasks",
0)]
+ countertable["FAILED_TASKS"] = [otherinfo.get("numFailedTasks", 0)]
+ countertable["KILLED_TASKS"] = [otherinfo.get("numKilledTasks", 0)]
+ countertable["FAILED_TASK_ATTEMPTS"] = [
+ otherinfo.get("numFailedTaskAttempts", 0)
+ ]
+ countertable["KILLED_TASK_ATTEMPTS"] = [
+ otherinfo.get("numKilledTaskAttempts", 0)
+ ]
+ difftable["otherinfo"] = countertable
+
+ with open(dag_json_file2) as data_file:
+ file2_dag_json = (
+ json.load(data_file)["dag"]
+ if file2_using_dag_json
+ else json.load(data_file)
+ )
+
+ otherinfo = file2_dag_json.get("otherinfo", {})
+ counters = otherinfo.get("counters", {})
+
+ for group in counters.get("counterGroups", []):
+ groupName = group["counterGroupName"]
+ if groupName not in difftable:
+ difftable[groupName] = {}
+ countertable = difftable[groupName]
+ for counter in group["counters"]:
+ counterName = counter["counterName"]
+ # if counter does not exist in file1, add it with 0 value
+ if counterName not in countertable:
+ countertable[counterName] = [0]
+ countertable[counterName].append(counter["counterValue"])
+
+ # append other info safely
+ countertable = difftable["otherinfo"]
+ countertable["TIME_TAKEN"].append(otherinfo.get("timeTaken", 0))
+
countertable["COMPLETED_TASKS"].append(otherinfo.get("numCompletedTasks", 0))
+
countertable["SUCCEEDED_TASKS"].append(otherinfo.get("numSucceededTasks", 0))
+ countertable["FAILED_TASKS"].append(otherinfo.get("numFailedTasks", 0))
+ countertable["KILLED_TASKS"].append(otherinfo.get("numKilledTasks", 0))
+ countertable["FAILED_TASK_ATTEMPTS"].append(
+ otherinfo.get("numFailedTaskAttempts", 0)
+ )
+ countertable["KILLED_TASK_ATTEMPTS"].append(
+ otherinfo.get("numKilledTaskAttempts", 0)
+ )
+ difftable["otherinfo"] = countertable
+
+ # if some counters are missing, consider it as 0 and compute delta
difference
+ for k, v in difftable.items():
+ for key, value in v.items():
+ # if counter value does not exisit in file2, add it with 0 value
+ if len(value) == 1:
+ value.append(0)
+
+ # store delta difference
+ delta = value[1] - value[0]
+ value.append(("+" if delta > 0 else "") + str(delta))
+
+ return difftable
+
def print_table(difftable, name1, name2, detailed=False):
- table = Texttable(max_width=0)
- table.set_cols_align(["l", "l", "l", "l", "l"])
- table.set_cols_valign(["m", "m", "m", "m", "m"])
- table.add_row(["Counter Group", "Counter Name", name1, name2, "delta"]);
- for k in sorted(difftable):
- # ignore task specific counters in default output
- if not detailed and ("_INPUT_" in k or "_OUTPUT_" in k):
- continue
-
- v = difftable[k]
- row = []
- # counter group. using shortname here instead of FQCN
- if detailed:
- row.append(k)
- else:
- row.append(k.split(".")[-1])
-
- # keys as list (counter names)
- row.append("\n".join(list(v.keys())))
-
- # counter values for dag1
- for key, value in v.items():
- if len(value) == 1:
- value.append(0)
- value.append(value[0] - value[1])
-
- # dag1 counter values
- name1Val = []
- for key, value in v.items():
- name1Val.append(str(value[0]))
- row.append("\n".join(name1Val))
-
- # dag2 counter values
- name2Val = []
- for key, value in v.items():
- name2Val.append(str(value[1]))
- row.append("\n".join(name2Val))
-
- # delta values
- deltaVal = []
- for key, value in v.items():
- deltaVal.append(str(value[2]))
- row.append("\n".join(deltaVal))
-
- table.add_row(row)
-
- print table.draw() + "\n"
+ table = Texttable(max_width=0)
+ table.set_cols_align(["l", "l", "l", "l", "l"])
+ table.set_cols_valign(["m", "m", "m", "m", "m"])
+ table.add_row(["Counter Group", "Counter Name", name1, name2, "delta"])
+ for k in sorted(difftable):
+ # ignore task specific counters in default output
+ if not detailed and ("_INPUT_" in k or "_OUTPUT_" in k):
+ continue
+
+ v = difftable[k]
+ row = []
+ # counter group. using shortname here instead of FQCN
+ if detailed:
+ row.append(k)
+ else:
+ row.append(k.split(".")[-1])
+
+ # keys as list (counter names)
+ row.append("\n".join(list(v.keys())))
+
+ # counter values for dag1
+ for key, value in v.items():
+ if len(value) == 1:
+ value.append(0)
+ value.append(value[0] - value[1])
+
+ # dag1 counter values
+ name1Val = []
+ for key, value in v.items():
+ name1Val.append(str(value[0]))
+ row.append("\n".join(name1Val))
+
+ # dag2 counter values
+ name2Val = []
+ for key, value in v.items():
+ name2Val.append(str(value[1]))
+ row.append("\n".join(name2Val))
+
+ # delta values
+ deltaVal = []
+ for key, value in v.items():
+ deltaVal.append(str(value[2]))
+ row.append("\n".join(deltaVal))
+
+ table.add_row(row)
+
+ print(table.draw() + "\n")
def main(argv):
- sysargs = len(argv)
- if sysargs < 2:
- print "Usage: python counter-diff.py dag_file1.zip
dag_file2.zip [--detail]"
- return -1
+ sysargs = len(argv)
+ if sysargs < 2:
+ print("Usage: python3 counter-diff.py dag_file1.zip dag_file2.zip
[--detail]")
+ return -1
+
+ file1 = argv[0]
+ file2 = argv[1]
+ difftable = diff(file1, file2)
- file1 = argv[0]
- file2 = argv[1]
- difftable = diff(file1, file2)
+ detailed = False
+ if sysargs == 3 and argv[2] == "--detail":
+ detailed = True
- detailed = False
- if sysargs == 3 and argv[2] == "--detail":
- detailed = True
+ print_table(
+ difftable, os.path.splitext(file1)[0], os.path.splitext(file2)[0],
detailed
+ )
- print_table(difftable, os.path.splitext(file1)[0],
os.path.splitext(file2)[0], detailed)
if __name__ == "__main__":
- try:
- sys.exit(main(sys.argv[1:]))
- finally:
- shutil.rmtree(tmpdir)
\ No newline at end of file
+ try:
+ sys.exit(main(sys.argv[1:]))
+ finally:
+ shutil.rmtree(tmpdir)
diff --git a/tez-tools/swimlanes/amlogparser.py
b/tez-tools/swimlanes/amlogparser.py
index 16b82bcf5..17fb8f448 100644
--- a/tez-tools/swimlanes/amlogparser.py
+++ b/tez-tools/swimlanes/amlogparser.py
@@ -17,274 +17,311 @@
# under the License.
#
-import sys,re
+import sys
+import re
from itertools import groupby
-from bz2 import BZ2File
-from gzip import GzipFile as GZFile
-try:
- from urllib.request import urlopen
-except:
- from urllib2 import urlopen as urlopen
+import bz2
+import gzip
+from urllib.request import urlopen
+
class AMRawEvent(object):
- def __init__(self, ts, dag, event, args):
- self.ts = ts
- self.dag = dag
- self.event = event
- self.args = args
- def __repr__(self):
- return "%s->%s (%s)" % (self.dag, self.event, self.args)
+ def __init__(self, ts, dag, event, args):
+ self.ts = ts
+ self.dag = dag
+ self.event = event
+ self.args = args
+
+ def __repr__(self):
+ return "%s->%s (%s)" % (self.dag, self.event, self.args)
+
def first(l):
- return (l[:1] or [None])[0]
+ return (l[:1] or [None])[0]
+
def kv_add(d, k, v):
- if(d.has_key(k)):
- oldv = d[k]
- if(type(oldv) is list):
- oldv.append(v)
- else:
- oldv = [oldv, v]
- d[k] = oldv
- else:
- d[k] = v
-
+ if k in d:
+ oldv = d[k]
+ if type(oldv) is list:
+ oldv.append(v)
+ else:
+ oldv = [oldv, v]
+ d[k] = oldv
+ else:
+ d[k] = v
+
+
def csv_kv(args):
- kvs = {};
- pairs = [p.strip() for p in args.split(",")]
- for kv in pairs:
- if(kv.find("=") == -1):
- kv_add(kvs, kv, None)
- elif(kv.find("=") == kv.rfind("=")):
- (k,v) = kv.split("=")
- kv_add(kvs, k, v)
- return kvs
+ kvs = {}
+ pairs = [p.strip() for p in args.split(",")]
+ for kv in pairs:
+ if kv.find("=") == -1:
+ kv_add(kvs, kv, None)
+ elif kv.find("=") == kv.rfind("="):
+ (k, v) = kv.split("=")
+ kv_add(kvs, k, v)
+ return kvs
+
class AppMaster(object):
- def __init__(self, raw):
- self.raw = raw
- self.kvs = csv_kv(raw.args)
- self.name = self.kvs["appAttemptId"]
- self.zero = int(self.kvs["startTime"])
- #self.ready = int(self.kvs["initTime"])
- #self.start = int(self.kvs["appSubmitTime"])
- self.containers = None
- self.dags = None
- def __repr__(self):
- return "[%s started at %d]" % (self.name, self.zero)
+ def __init__(self, raw):
+ self.raw = raw
+ self.kvs = csv_kv(raw.args)
+ self.name = self.kvs["appAttemptId"]
+ self.zero = int(self.kvs["startTime"])
+ # self.ready = int(self.kvs["initTime"])
+ # self.start = int(self.kvs["appSubmitTime"])
+ self.containers = None
+ self.dags = None
+
+ def __repr__(self):
+ return "[%s started at %d]" % (self.name, self.zero)
+
class DummyAppMaster(object):
- """ magic of duck typing """
- def __init__(self, dag):
- self.raw = None
- self.kvs = {}
- self.name = "Appmaster for %s" % dag.name
- self.zero = dag.start
- self.containers = None
- self.dags = None
-
+ """magic of duck typing"""
+
+ def __init__(self, dag):
+ self.raw = None
+ self.kvs = {}
+ self.name = "Appmaster for %s" % dag.name
+ self.zero = dag.start
+ self.containers = None
+ self.dags = None
+
+
class Container(object):
- def __init__(self, raw):
- self.raw = raw
- self.kvs = csv_kv(raw.args)
- self.name = self.kvs["containerId"]
- self.start = int(self.kvs["launchTime"])
- self.stop = -1
- self.status = 0
- self.node =""
- def __repr__(self):
- return "[%s start=%d]" % (self.name, self.start)
+ def __init__(self, raw):
+ self.raw = raw
+ self.kvs = csv_kv(raw.args)
+ self.name = self.kvs["containerId"]
+ self.start = int(self.kvs["launchTime"])
+ self.stop = -1
+ self.status = 0
+ self.node = ""
+
+ def __repr__(self):
+ return "[%s start=%d]" % (self.name, self.start)
+
class DummyContainer(object):
- def __init__(self, attempt):
- self.raw = None
- self.kvs = {}
- self.name = attempt.container
- self.status = 0
- self.start = attempt.start
- self.stop = -1
- self.status = 0
- self.node = None
+ def __init__(self, attempt):
+ self.raw = None
+ self.kvs = {}
+ self.name = attempt.container
+ self.status = 0
+ self.start = attempt.start
+ self.stop = -1
+ self.status = 0
+ self.node = None
+
class DAG(object):
- def __init__(self, raw):
- self.raw = raw
- self.name = raw.dag
- self.kvs = csv_kv(raw.args)
- self.start = (int)(self.kvs["startTime"])
- self.finish = (int)(self.kvs["finishTime"])
- self.duration = (int)(self.kvs["timeTaken"])
- def structure(self, vertexes):
- self.vertexes = [v for v in vertexes if v.dag == self.name]
- def attempts(self):
- for v in self.vertexes:
- for t in v.tasks:
- for a in t.attempts:
- if(a.dag == self.name):
- yield a
- def __repr__(self):
- return "%s (%d+%d)" % (self.name, self.start, self.duration)
+ def __init__(self, raw):
+ self.raw = raw
+ self.name = raw.dag
+ self.kvs = csv_kv(raw.args)
+ self.start = (int)(self.kvs["startTime"])
+ self.finish = (int)(self.kvs["finishTime"])
+ self.duration = (int)(self.kvs["timeTaken"])
+
+ def structure(self, vertexes):
+ self.vertexes = [v for v in vertexes if v.dag == self.name]
+
+ def attempts(self):
+ for v in self.vertexes:
+ for t in v.tasks:
+ for a in t.attempts:
+ if a.dag == self.name:
+ yield a
+
+ def __repr__(self):
+ return "%s (%d+%d)" % (self.name, self.start, self.duration)
+
class Vertex(object):
- def __init__(self, raw):
- self.raw = raw
- self.dag = raw.dag
- self.kvs = csv_kv(raw.args)
- self.name = self.kvs["vertexName"]
- self.initZero = (int)(self.kvs["initRequestedTime"])
- self.init = (int)(self.kvs["initedTime"])
- self.startZero = (int)(self.kvs["startRequestedTime"])
- self.start = (int)(self.kvs["startedTime"])
- self.finish = (int)(self.kvs["finishTime"])
- self.duration = (int)(self.kvs["timeTaken"])
- def structure(self, tasks):
- self.tasks = [t for t in tasks if t.vertex == self.name]
- def __repr__(self):
- return "%s (%d+%d)" % (self.name, self.start, self.duration)
+ def __init__(self, raw):
+ self.raw = raw
+ self.dag = raw.dag
+ self.kvs = csv_kv(raw.args)
+ self.name = self.kvs["vertexName"]
+ self.initZero = (int)(self.kvs["initRequestedTime"])
+ self.init = (int)(self.kvs["initedTime"])
+ self.startZero = (int)(self.kvs["startRequestedTime"])
+ self.start = (int)(self.kvs["startedTime"])
+ self.finish = (int)(self.kvs["finishTime"])
+ self.duration = (int)(self.kvs["timeTaken"])
+
+ def structure(self, tasks):
+ self.tasks = [t for t in tasks if t.vertex == self.name]
+
+ def __repr__(self):
+ return "%s (%d+%d)" % (self.name, self.start, self.duration)
class Task(object):
- def __init__(self, raw):
- self.raw = raw
- self.dag = raw.dag
- self.kvs = csv_kv(raw.args)
- self.vertex = self.kvs["vertexName"]
- self.name = self.kvs["taskId"]
- self.start = (int)(self.kvs["startTime"])
- self.finish = (int)(self.kvs["finishTime"])
- self.duration = (int)(self.kvs["timeTaken"])
- def structure(self, attempts):
- self.attempts = [a for a in attempts if a.task == self.name]
- def __repr__(self):
- return "%s (%d+%d)" % (self.name, self.start, self.duration)
+ def __init__(self, raw):
+ self.raw = raw
+ self.dag = raw.dag
+ self.kvs = csv_kv(raw.args)
+ self.vertex = self.kvs["vertexName"]
+ self.name = self.kvs["taskId"]
+ self.start = (int)(self.kvs["startTime"])
+ self.finish = (int)(self.kvs["finishTime"])
+ self.duration = (int)(self.kvs["timeTaken"])
+
+ def structure(self, attempts):
+ self.attempts = [a for a in attempts if a.task == self.name]
+
+ def __repr__(self):
+ return "%s (%d+%d)" % (self.name, self.start, self.duration)
+
class Attempt(object):
- def __init__(self, pair):
- start = first(filter(lambda a: a.event ==
"TASK_ATTEMPT_STARTED", pair))
- finish = first(filter(lambda a: a.event ==
"TASK_ATTEMPT_FINISHED", pair))
- if start is None or finish is None:
- print [start, finish];
- self.raw = finish
- self.kvs = csv_kv(start.args)
- if finish is not None:
- self.dag = finish.dag
- self.kvs.update(csv_kv(finish.args))
- self.finish = (int)(self.kvs["finishTime"])
- self.duration = (int)(self.kvs["timeTaken"])
- self.name = self.kvs["taskAttemptId"]
- self.task =
self.name[:self.name.rfind("_")].replace("attempt","task")
- (_, _, amid, dagid, vertexid, taskid, attemptid) =
self.name.split("_")
- self.tasknum = int(taskid)
- self.attemptnum = int(attemptid)
- self.vertex = self.kvs["vertexName"]
- self.start = (int)(self.kvs["startTime"])
- self.container = self.kvs["containerId"]
- self.node = self.kvs["nodeId"]
- def __repr__(self):
- return "%s (%d+%d)" % (self.name, self.start, self.duration)
-
+ def __init__(self, pair):
+ # Consuming iterators immediately with list() for Py3 compatibility
+ start = first(list(filter(lambda a: a.event == "TASK_ATTEMPT_STARTED",
pair)))
+ finish = first(list(filter(lambda a: a.event ==
"TASK_ATTEMPT_FINISHED", pair)))
+ if start is None or finish is None:
+ print([start, finish])
+ self.raw = finish
+ self.kvs = csv_kv(start.args)
+ if finish is not None:
+ self.dag = finish.dag
+ self.kvs.update(csv_kv(finish.args))
+ self.finish = (int)(self.kvs["finishTime"])
+ self.duration = (int)(self.kvs["timeTaken"])
+ self.name = self.kvs["taskAttemptId"]
+ self.task = self.name[: self.name.rfind("_")].replace("attempt",
"task")
+ (_, _, amid, dagid, vertexid, taskid, attemptid) = self.name.split("_")
+ self.tasknum = int(taskid)
+ self.attemptnum = int(attemptid)
+ self.vertex = self.kvs["vertexName"]
+ self.start = (int)(self.kvs["startTime"])
+ self.container = self.kvs["containerId"]
+ self.node = self.kvs["nodeId"]
+
+ def __repr__(self):
+ return "%s (%d+%d)" % (self.name, self.start, self.duration)
+
def open_file(f):
- if(f.endswith(".gz")):
- return GZFile(f)
- elif(f.endswith(".bz2")):
- return BZ2File(f)
- elif(f.startswith("http://")):
- return urlopen(f)
- return open(f)
-
-class AMLog(object):
- def init(self):
- ID=r'[^\]]*'
- TS=r'[0-9:\-, ]*'
- MAIN_RE=r'^(?P<ts>%(ts)s) [?INFO]? [(?P<thread>%(id)s)]
\|?((HistoryEventHandler.criticalEvents)|((org.apache.tez.dag.)?history.HistoryEventHandler))\|?:
[HISTORY][DAG:(?P<dag>%(id)s)][Event:(?P<event>%(id)s)]: (?P<args>.*)'
- MAIN_RE = MAIN_RE.replace('[','\[').replace(']','\]')
- MAIN_RE = MAIN_RE % {'ts' : TS, 'id' : ID}
- self.MAIN_RE = re.compile(MAIN_RE)
-
- def __init__(self, f):
- fp = open_file(f)
- self.init()
- self.events = filter(lambda a:a, [self.parse(l.strip()) for l
in fp])
-
- def structure(self):
- am = self.appmaster() # this is a copy
- containers = dict([(a.name, a) for a in self.containers()])
- dags = self.dags()
- vertexes = self.vertexes()
- tasks = self.tasks()
- attempts = self.attempts()
- for t in tasks:
- t.structure(attempts)
- for v in vertexes:
- v.structure(tasks)
- for d in dags:
- d.structure(vertexes)
- for a in attempts:
- if containers.has_key(a.container):
- c = containers[a.container]
- c.node = a.node
- else:
- c = DummyContainer(a)
- containers[a.container] = c
- if not am:
- am = DummyAppMaster(first(dags))
- am.containers = containers
- am.dags = dags
- return am
-
- def appmaster(self):
- return first([AppMaster(ev) for ev in self.events if ev.event
== "AM_STARTED"])
-
- def containers(self):
- containers = [Container(ev) for ev in self.events if ev.event
== "CONTAINER_LAUNCHED"]
- containermap = dict([(c.name, c) for c in containers])
- for ev in self.events:
- if ev.event == "CONTAINER_STOPPED":
- kvs = csv_kv(ev.args)
- if containermap.has_key(kvs["containerId"]):
- containermap[kvs["containerId"]].stop =
int(kvs["stoppedTime"])
- containermap[kvs["containerId"]].status
= int(kvs["exitStatus"])
- return containers
-
-
- def dags(self):
- dags = [DAG(ev) for ev in self.events if ev.event ==
"DAG_FINISHED"]
- return dags
-
- def vertexes(self):
- """ yes, not vertices """
- vertexes = [Vertex(ev) for ev in self.events if ev.event ==
"VERTEX_FINISHED"]
- return vertexes
-
- def tasks(self):
- tasks = [Task(ev) for ev in self.events if ev.event ==
"TASK_FINISHED"]
- return tasks
-
- def attempts(self):
- key = lambda a:a[0]
- value = lambda a:a[1]
- raw = [(csv_kv(ev.args)["taskAttemptId"], ev) for ev in
self.events if ev.event == "TASK_ATTEMPT_STARTED" or ev.event ==
"TASK_ATTEMPT_FINISHED"]
- pairs = groupby(sorted(raw), key = key)
- attempts = [Attempt(map(value,p)) for (k,p) in pairs]
- return attempts
-
- def parse(self, l):
- if(l.find("[HISTORY]") != -1):
- m = self.MAIN_RE.match(l)
- ts = m.group("ts")
- dag = m.group("dag")
- event = m.group("event")
- args = m.group("args")
- return AMRawEvent(ts, dag, event, args)
+ if f.endswith(".gz"):
+ return gzip.open(f, "rt")
+ elif f.endswith(".bz2"):
+ return bz2.open(f, "rt")
+ elif f.startswith("http://"):
+ return urlopen(f)
+ return open(f, "r")
+
+
+class AMLog(object):
+ def init(self):
+ ID = r"[^\]]*"
+ TS = r"[0-9:\-, ]*"
+ MAIN_RE = r"^(?P<ts>%(ts)s) [?INFO]? [(?P<thread>%(id)s)]
\|?((HistoryEventHandler.criticalEvents)|((org.apache.tez.dag.)?history.HistoryEventHandler))\|?:
[HISTORY][DAG:(?P<dag>%(id)s)][Event:(?P<event>%(id)s)]: (?P<args>.*)"
+ # Fix for SyntaxWarning: using raw strings
+ MAIN_RE = MAIN_RE.replace("[", r"\[").replace("]", r"\]")
+ MAIN_RE = MAIN_RE % {"ts": TS, "id": ID}
+ self.MAIN_RE = re.compile(MAIN_RE)
+
+ def __init__(self, f):
+ fp = open_file(f)
+ self.init()
+ # Filter returns iterator in Py3, list() ensures immediate execution
+ self.events = list(filter(lambda a: a, [self.parse(l.strip()) for l in
fp]))
+
+ def structure(self):
+ am = self.appmaster() # this is a copy
+ containers = dict([(a.name, a) for a in self.containers()])
+ dags = self.dags()
+ vertexes = self.vertexes()
+ tasks = self.tasks()
+ attempts = self.attempts()
+ for t in tasks:
+ t.structure(attempts)
+ for v in vertexes:
+ v.structure(tasks)
+ for d in dags:
+ d.structure(vertexes)
+ for a in attempts:
+ if a.container in containers:
+ c = containers[a.container]
+ c.node = a.node
+ else:
+ c = DummyContainer(a)
+ containers[a.container] = c
+ if not am:
+ am = DummyAppMaster(first(dags))
+ am.containers = containers
+ am.dags = dags
+ return am
+
+ def appmaster(self):
+ return first([AppMaster(ev) for ev in self.events if ev.event ==
"AM_STARTED"])
+
+ def containers(self):
+ containers = [
+ Container(ev) for ev in self.events if ev.event ==
"CONTAINER_LAUNCHED"
+ ]
+ containermap = dict([(c.name, c) for c in containers])
+ for ev in self.events:
+ if ev.event == "CONTAINER_STOPPED":
+ kvs = csv_kv(ev.args)
+ if kvs["containerId"] in containermap:
+ containermap[kvs["containerId"]].stop =
int(kvs["stoppedTime"])
+ containermap[kvs["containerId"]].status =
int(kvs["exitStatus"])
+ return containers
+
+ def dags(self):
+ dags = [DAG(ev) for ev in self.events if ev.event == "DAG_FINISHED"]
+ return dags
+
+ def vertexes(self):
+ """yes, not vertices"""
+ vertexes = [Vertex(ev) for ev in self.events if ev.event ==
"VERTEX_FINISHED"]
+ return vertexes
+
+ def tasks(self):
+ tasks = [Task(ev) for ev in self.events if ev.event == "TASK_FINISHED"]
+ return tasks
+
+ def attempts(self):
+ key = lambda a: a[0]
+ value = lambda a: a[1]
+ raw = [
+ (csv_kv(ev.args)["taskAttemptId"], ev)
+ for ev in self.events
+ if ev.event == "TASK_ATTEMPT_STARTED" or ev.event ==
"TASK_ATTEMPT_FINISHED"
+ ]
+ # FIX: explicitly pass key to sorted() to avoid comparing AMRawEvent
objects
+ # which causes TypeError in Python 3
+ pairs = groupby(sorted(raw, key=key), key=key)
+ # Map returns iterator in Py3, list() creates the necessary list
+ attempts = [Attempt(list(map(value, p))) for (k, p) in pairs]
+ return attempts
+
+ def parse(self, l):
+ if l.find("[HISTORY]") != -1:
+ m = self.MAIN_RE.match(l)
+ if m:
+ ts = m.group("ts")
+ dag = m.group("dag")
+ event = m.group("event")
+ args = m.group("args")
+ return AMRawEvent(ts, dag, event, args)
+ return None
+
def main(argv):
- tree = AMLog(argv[0]).structure()
- # AM -> dag -> vertex -> task -> attempt
- # AM -> container
- for d in tree.dags:
- for a in d.attempts():
- print [a.vertex, a.name, a.container, a.start, a.finish]
+ tree = AMLog(argv[0]).structure()
+ # AM -> dag -> vertex -> task -> attempt
+ # AM -> container
+ for d in tree.dags:
+ for a in d.attempts():
+ print([a.vertex, a.name, a.container, a.start, a.finish])
+
if __name__ == "__main__":
- main(sys.argv[1:])
+ main(sys.argv[1:])
diff --git a/tez-tools/swimlanes/swimlane.py b/tez-tools/swimlanes/swimlane.py
index 11976daab..f763b871d 100644
--- a/tez-tools/swimlanes/swimlane.py
+++ b/tez-tools/swimlanes/swimlane.py
@@ -17,185 +17,363 @@
# under the License.
#
-import sys,math,os.path
-import StringIO
+import sys
+import io
from amlogparser import AMLog
from getopt import getopt
+
class ColourManager(object):
- def __init__(self):
- # text-printable colours
- self.colours = [
- '#E4F5FC', '#62C2A2', '#E2F2D8', '#A9DDB4', '#E2F6E1',
'#D8DAD7', '#BBBDBA', '#FEE6CE', '#FFCF9F',
- '#FDAE69', '#FDE4DD', '#EDE6F2', '#A5BDDB', '#FDE1EE',
'#D8B9D8', '#D7DCEC', '#BABDDA', '#FDC5BF',
- '#FC9FB3', '#FDE1D2', '#FBBB9E', '#DBEF9F', '#AADD8E',
'#81CDBB', '#C7EDE8', '#96D9C8', '#E3EBF4',
- '#BAD3E5', '#9DBDD9', '#8996C8', '#CEEAC6', '#76CCC6',
'#C7E9BE', '#9ED99C', '#71C572', '#EFF1EE',
- '#949693', '#FD8D3D', '#FFF7ED', '#FED3AE', '#FEBB8F',
'#FCE9CA', '#FED49B', '#FBBC85', '#FB8E58',
- '#FFEEE8', '#D0D0E8', '#76A9CE', '#FDFFFC', '#E9E2EE',
'#64A8D2', '#FAF7FC', '#F6ECF2', '#F8E7F0',
- '#C994C6', '#E063B1', '#ECEDF7', '#DDD9EB', '#9B9BCA',
'#FEDFDE', '#F8689F', '#FC9273', '#FC6948',
- '#F6FDB6', '#78C67B', '#EBF9B0', '#C5E9B0', '#40B7C7',
'#FDF7BA', '#FFE392', '#FFC34C', '#FF982A']
- self.i = 0
- def next(self):
- self.i += 1
- return self.colours[self.i % len(self.colours)]
+ def __init__(self):
+ # text-printable colours
+ self.colours = [
+ "#E4F5FC",
+ "#62C2A2",
+ "#E2F2D8",
+ "#A9DDB4",
+ "#E2F6E1",
+ "#D8DAD7",
+ "#BBBDBA",
+ "#FEE6CE",
+ "#FFCF9F",
+ "#FDAE69",
+ "#FDE4DD",
+ "#EDE6F2",
+ "#A5BDDB",
+ "#FDE1EE",
+ "#D8B9D8",
+ "#D7DCEC",
+ "#BABDDA",
+ "#FDC5BF",
+ "#FC9FB3",
+ "#FDE1D2",
+ "#FBBB9E",
+ "#DBEF9F",
+ "#AADD8E",
+ "#81CDBB",
+ "#C7EDE8",
+ "#96D9C8",
+ "#E3EBF4",
+ "#BAD3E5",
+ "#9DBDD9",
+ "#8996C8",
+ "#CEEAC6",
+ "#76CCC6",
+ "#C7E9BE",
+ "#9ED99C",
+ "#71C572",
+ "#EFF1EE",
+ "#949693",
+ "#FD8D3D",
+ "#FFF7ED",
+ "#FED3AE",
+ "#FEBB8F",
+ "#FCE9CA",
+ "#FED49B",
+ "#FBBC85",
+ "#FB8E58",
+ "#FFEEE8",
+ "#D0D0E8",
+ "#76A9CE",
+ "#FDFFFC",
+ "#E9E2EE",
+ "#64A8D2",
+ "#FAF7FC",
+ "#F6ECF2",
+ "#F8E7F0",
+ "#C994C6",
+ "#E063B1",
+ "#ECEDF7",
+ "#DDD9EB",
+ "#9B9BCA",
+ "#FEDFDE",
+ "#F8689F",
+ "#FC9273",
+ "#FC6948",
+ "#F6FDB6",
+ "#78C67B",
+ "#EBF9B0",
+ "#C5E9B0",
+ "#40B7C7",
+ "#FDF7BA",
+ "#FFE392",
+ "#FFC34C",
+ "#FF982A",
+ ]
+ self.i = 0
+
+ def next(self):
+ self.i += 1
+ return self.colours[self.i % len(self.colours)]
+
def attempts(tree):
- for d in tree.dags:
- for a in d.attempts():
- yield (a.vertex, a.name, a.container, a.start, a.finish)
+ for d in tree.dags:
+ for a in d.attempts():
+ yield (a.vertex, a.name, a.container, a.start, a.finish)
+
def attrs(args):
- s = ""
- for k in args:
- v = args[k]
- k = k.replace("_","-") # css
- if type(v) is str:
- s += "%s='%s' " % (k,v)
- else:
- s += "%s=%s " % (k,str(v))
- return s
+ s = ""
+ for k in args:
+ v = args[k]
+ k = k.replace("_", "-") # css
+ if type(v) is str:
+ s += "%s='%s' " % (k, v)
+ else:
+ s += "%s=%s " % (k, str(v))
+ return s
+
class SVGHelper(object):
- def __init__(self, w, h, parent=None):
- self.width = w
- self.height = h
- self.parent = parent
- if(not parent):
- self.lines = StringIO.StringIO()
- self.write("""<?xml version="1.0" standalone="no"?>
+ def __init__(self, w, h, parent=None):
+ self.width = w
+ self.height = h
+ self.parent = parent
+ if not parent:
+ self.lines = io.StringIO()
+ self.write("""<?xml version="1.0" standalone="no"?>
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
""")
- else:
- self.lines = parent.lines
- self.write("""<svg xmlns="http://www.w3.org/2000/svg"
version="1.1" xmlns:xlink="http://www.w3.org/1999/xlink" height="%d"
width="%d">""" % (h, w))
- self.write("""
+ else:
+ self.lines = parent.lines
+ self.write(
+ """<svg xmlns="http://www.w3.org/2000/svg" version="1.1"
xmlns:xlink="http://www.w3.org/1999/xlink" height="%d" width="%d">"""
+ % (h, w)
+ )
+ self.write("""
<script type="text/ecmascript"
xlink:href="http://code.jquery.com/jquery-2.1.1.min.js" />
""")
- def line(self, x1, y1, x2, y2, style="stroke: #000", **kwargs):
- self.write("""<line x1="%d" y1="%d" x2="%d" y2="%d" style="%s"
%s />""" % (x1, y1, x2, y2, style, attrs(kwargs)))
- def rect(self, left, top, right, bottom, style="", title="", link=None):
- w = (right-left)
- h = (bottom-top)
- if link:
- self.write("<a xlink:href='%s'>" % link)
- self.write("""<rect x="%d" y="%d" width="%d" height="%d"
style="%s"><title>%s</title></rect>""" % (left, top, w, h, style, title))
- if link:
- self.write("</a>")
- def text(self, x, y, text, style="", transform=""):
- self.write("""<text x="%d" y="%d" style="%s"
transform="%s">%s</text>""" % (x, y, style, transform, text))
- def link(self, x, y, text, link, style=""):
- self.write("<a xlink:href='%s'>" % link)
- self.text(x, y, text, style)
- self.write("</a>")
- def write(self, s):
- self.lines.write(s)
- def flush(self):
- self.write("</svg>")
- if(self.parent):
- self.parent.flush()
- return self.lines.getvalue()
+
+ def line(self, x1, y1, x2, y2, style="stroke: #000", **kwargs):
+ self.write(
+ """<line x1="%d" y1="%d" x2="%d" y2="%d" style="%s" %s />"""
+ % (x1, y1, x2, y2, style, attrs(kwargs))
+ )
+
+ def rect(self, left, top, right, bottom, style="", title="", link=None):
+ w = right - left
+ h = bottom - top
+ if link:
+ self.write("<a xlink:href='%s'>" % link)
+ self.write(
+ """<rect x="%d" y="%d" width="%d" height="%d"
style="%s"><title>%s</title></rect>"""
+ % (left, top, w, h, style, title)
+ )
+ if link:
+ self.write("</a>")
+
+ def text(self, x, y, text, style="", transform=""):
+ self.write(
+ """<text x="%d" y="%d" style="%s" transform="%s">%s</text>"""
+ % (x, y, style, transform, text)
+ )
+
+ def link(self, x, y, text, link, style=""):
+ self.write("<a xlink:href='%s'>" % link)
+ self.text(x, y, text, style)
+ self.write("</a>")
+
+ def write(self, s):
+ self.lines.write(s)
+
+ def flush(self):
+ self.write("</svg>")
+ if self.parent:
+ self.parent.flush()
+ return self.lines.getvalue()
+
def usage():
- sys.stderr.write("""
+ sys.stderr.write("""
usage: swimlane.py [-t ms-per-pixel] [-o outputfile] [-f redline-fraction]
<log-file>
Input files for this tool can be prepared by "yarn logs -applicationId
<application_...> | grep HISTORY".
""")
+
def main(argv):
- (opts, args) = getopt(argv, "o:t:f:")
- out = sys.stdout
- ticks = -1 # precision of 1/tick
- fraction = -1
- for k,v in opts:
- if(k == "-o"):
- out = open(v, "w")
- if(k == "-t"):
- ticks = int(v)
- if(k == "-f"):
- if(int(v) < 100):
- fraction = int(v)/100.0
- if len(args) == 0:
- return usage()
- log = AMLog(args[0]).structure()
- lanes = [c.name for c in sorted(log.containers.values(), key=lambda a:
a.start)]
- marginTop = 128
- marginRight = 100;
- laneSize = 24
- y = len(lanes)*laneSize
- items = attempts(log)
- maxx = max([a[4] for a in items])
- if ticks == -1:
- ticks = min(1000, (maxx - log.zero)/2048)
- xdomain = lambda t : (t - log.zero)/ticks
- x = xdomain(maxx)
- svg = SVGHelper(x+2*marginRight+256, y+2*marginTop)
- a = marginTop
- svg.text(x/2, 32, log.name, style="font-size: 32px; text-anchor:
middle")
- containerMap = dict(zip(list(lanes), xrange(len(lanes))))
- svg.text(marginRight - 16, marginTop - 32, "Container ID",
"text-anchor:end; font-size: 16px;")
- # draw a grid
- for l in lanes:
- a += laneSize
- svg.text(marginRight - 4, a, l, "text-anchor:end; font-size:
16px;")
- svg.line(marginRight, a, marginRight+x, a, "stroke: #ccc")
- for x1 in set(range(0, x, 10*ticks)) | set([x]):
- svg.text(marginRight+x1, marginTop-laneSize/2, "%0.2f s" % ((x1
* ticks)/1000), "text-anchor: middle; font-size: 12px")
- svg.line(marginRight+x1, marginTop-laneSize/2, marginRight+x1,
marginTop+y, "stroke: #ddd")
- svg.line(marginRight, marginTop, marginRight+x, marginTop)
- svg.line(marginRight, y+marginTop, marginRight+x, y+marginTop)
- svg.line(marginRight, marginTop, marginRight, y+marginTop)
- svg.line(marginRight+x, marginTop, marginRight+x, y+marginTop)
-
- colourman = ColourManager()
- for c in log.containers.values():
- y1 = marginTop+(containerMap[c.name]*laneSize)
- x1 = marginRight+xdomain(c.start)
- svg.line(x1, y1, x1, y1 + laneSize, style="stroke: green")
- if c.stop > c.start:
- x2 = marginRight+xdomain(c.stop)
- if (c.status == 0):
- svg.line(x2, y1, x2, y1 + laneSize,
style="stroke: green")
- else:
- svg.line(x2, y1, x2, y1 + laneSize,
style="stroke: red")
- svg.text(x2, y1, "%d" % (c.status),
style="text-anchor: right; font-size: 12px; stroke: red", transform="rotate(90,
%d, %d)" % (x2, y1))
- svg.rect(x1, y1, x2, y1 + laneSize, style="fill: #ccc;
opacity: 0.3")
- elif c.stop == -1:
- x2 = marginRight+x
- svg.rect(x1, y1, x2, y1 + laneSize, style="fill: #ccc;
opacity: 0.3")
- for dag in log.dags:
- x1 = marginRight+xdomain(dag.start)
- svg.line(x1, marginTop-24, x1, marginTop+y, "stroke: black;",
stroke_dasharray="8,4")
- x2 = marginRight+xdomain(dag.finish)
- svg.line(x2, marginTop-24, x2, marginTop+y, "stroke: black;",
stroke_dasharray="8,4")
- svg.line(x1, marginTop-24, x2, marginTop-24, "stroke: black")
- svg.text((x1+x2)/2, marginTop-32, "%s (%0.1f s)" % (dag.name,
(dag.finish-dag.start)/1000.0) , "text-anchor: middle; font-size: 12px;")
- vertexes = set([v.name for v in dag.vertexes])
- colourmap = dict([(v,colourman.next()) for v in list(vertexes)])
- for c in dag.attempts():
- colour = colourmap[c.vertex]
- y1 = marginTop+(containerMap[c.container]*laneSize)+1
- x1 = marginRight+xdomain(c.start)
- x2 = marginRight+xdomain(c.finish)
- y2 = y1 + laneSize - 2
- locality = (c.kvs.has_key("DATA_LOCAL_TASKS") * 1) +
(c.kvs.has_key("RACK_LOCAL_TASKS")*2)
- #CompletedLogs may not be present in latest tez logs
- link = c.kvs.get("completedLogs", "")
- svg.rect(x1, y1, x2, y2, title=c.name, style="fill: %s;
stroke: #ccc;" % (colour), link=link)
- if locality > 1: # rack-local (no-locality isn't
counted)
- svg.rect(x1, y2-4, x2, y2, style="fill: #f00;
fill-opacity: 0.5;", link=link)
- if x2 - x1 > 64:
- svg.text((x1+x2)/2, y2-12, "%s (%05d_%d)" %
(c.vertex, c.tasknum, c.attemptnum), style="text-anchor: middle; font-size:
9px;")
- else:
- svg.text((x1+x2)/2, y2-12, "%s" % c.vertex,
style="text-anchor: middle; font-size: 9px;")
- finishes = sorted([c.finish for c in dag.attempts()])
- if(len(finishes) > 10 and fraction > 0):
- percentX = finishes[int(len(finishes)*fraction)]
- svg.line(marginRight+xdomain(percentX), marginTop,
marginRight+xdomain(percentX), y+marginTop, style="stroke: red")
- svg.text(marginRight+xdomain(percentX), y+marginTop+12,
"%d%% (%0.1fs)" % (int(fraction*100), (percentX - dag.start)/1000.0),
style="font-size:12px; text-anchor: middle")
- out.write(svg.flush())
- out.close()
- print("Output svg is written into: " + str(out))
+ (opts, args) = getopt(argv, "o:t:f:")
+ out = sys.stdout
+ out_filename = "stdout"
+ ticks = -1 # precision of 1/tick
+ fraction = -1
+ for k, v in opts:
+ if k == "-o":
+ out = open(v, "w")
+ out_filename = v
+ if k == "-t":
+ ticks = int(v)
+ if k == "-f":
+ if int(v) < 100:
+ fraction = int(v) / 100.0
+ if len(args) == 0:
+ return usage()
+ log = AMLog(args[0]).structure()
+ lanes = [c.name for c in sorted(log.containers.values(), key=lambda a:
a.start)]
+ marginTop = 128
+ marginRight = 100
+ laneSize = 24
+ y = len(lanes) * laneSize
+ items = list(attempts(log))
+ maxx = max([a[4] for a in items])
+ if ticks == -1:
+ ticks = min(1000, (maxx - log.zero) / 2048)
+ xdomain = lambda t: (t - log.zero) / ticks
+ x = xdomain(maxx)
+ svg = SVGHelper(x + 2 * marginRight + 256, y + 2 * marginTop)
+ a = marginTop
+ svg.text(x / 2, 32, log.name, style="font-size: 32px; text-anchor: middle")
+ containerMap = dict(zip(list(lanes), range(len(lanes))))
+ svg.text(
+ marginRight - 16,
+ marginTop - 32,
+ "Container ID",
+ "text-anchor:end; font-size: 16px;",
+ )
+ # draw a grid
+ for l in lanes:
+ a += laneSize
+ svg.text(marginRight - 4, a, l, "text-anchor:end; font-size: 16px;")
+ svg.line(marginRight, a, marginRight + x, a, "stroke: #ccc")
+ for x1 in set(range(0, int(x), int(10 * ticks))) | set([x]):
+ svg.text(
+ marginRight + x1,
+ marginTop - laneSize / 2,
+ "%0.2f s" % ((x1 * ticks) / 1000),
+ "text-anchor: middle; font-size: 12px",
+ )
+ svg.line(
+ marginRight + x1,
+ marginTop - laneSize / 2,
+ marginRight + x1,
+ marginTop + y,
+ "stroke: #ddd",
+ )
+ svg.line(marginRight, marginTop, marginRight + x, marginTop)
+ svg.line(marginRight, y + marginTop, marginRight + x, y + marginTop)
+ svg.line(marginRight, marginTop, marginRight, y + marginTop)
+ svg.line(marginRight + x, marginTop, marginRight + x, y + marginTop)
+
+ colourman = ColourManager()
+ for c in log.containers.values():
+ y1 = marginTop + (containerMap[c.name] * laneSize)
+ x1 = marginRight + xdomain(c.start)
+ svg.line(x1, y1, x1, y1 + laneSize, style="stroke: green")
+ if c.stop > c.start:
+ x2 = marginRight + xdomain(c.stop)
+ if c.status == 0:
+ svg.line(x2, y1, x2, y1 + laneSize, style="stroke: green")
+ else:
+ svg.line(x2, y1, x2, y1 + laneSize, style="stroke: red")
+ svg.text(
+ x2,
+ y1,
+ "%d" % (c.status),
+ style="text-anchor: right; font-size: 12px; stroke: red",
+ transform="rotate(90, %d, %d)" % (x2, y1),
+ )
+ svg.rect(x1, y1, x2, y1 + laneSize, style="fill: #ccc; opacity:
0.3")
+ elif c.stop == -1:
+ x2 = marginRight + x
+ svg.rect(x1, y1, x2, y1 + laneSize, style="fill: #ccc; opacity:
0.3")
+ for dag in log.dags:
+ x1 = marginRight + xdomain(dag.start)
+ svg.line(
+ x1,
+ marginTop - 24,
+ x1,
+ marginTop + y,
+ "stroke: black;",
+ stroke_dasharray="8,4",
+ )
+ x2 = marginRight + xdomain(dag.finish)
+ svg.line(
+ x2,
+ marginTop - 24,
+ x2,
+ marginTop + y,
+ "stroke: black;",
+ stroke_dasharray="8,4",
+ )
+ svg.line(x1, marginTop - 24, x2, marginTop - 24, "stroke: black")
+ svg.text(
+ (x1 + x2) / 2,
+ marginTop - 32,
+ "%s (%0.1f s)" % (dag.name, (dag.finish - dag.start) / 1000.0),
+ "text-anchor: middle; font-size: 12px;",
+ )
+ vertexes = set([v.name for v in dag.vertexes])
+ colourmap = dict([(v, colourman.next()) for v in list(vertexes)])
+ for c in dag.attempts():
+ colour = colourmap[c.vertex]
+ y1 = marginTop + (containerMap[c.container] * laneSize) + 1
+ x1 = marginRight + xdomain(c.start)
+ x2 = marginRight + xdomain(c.finish)
+ y2 = y1 + laneSize - 2
+ locality = ("DATA_LOCAL_TASKS" in c.kvs) * 1 + (
+ "RACK_LOCAL_TASKS" in c.kvs
+ ) * 2
+ # CompletedLogs may not be present in latest tez logs
+ link = c.kvs.get("completedLogs", "")
+ svg.rect(
+ x1,
+ y1,
+ x2,
+ y2,
+ title=c.name,
+ style="fill: %s; stroke: #ccc;" % (colour),
+ link=link,
+ )
+ if locality > 1: # rack-local (no-locality isn't counted)
+ svg.rect(
+ x1,
+ y2 - 4,
+ x2,
+ y2,
+ style="fill: #f00; fill-opacity: 0.5;",
+ link=link,
+ )
+ if x2 - x1 > 64:
+ svg.text(
+ (x1 + x2) / 2,
+ y2 - 12,
+ "%s (%05d_%d)" % (c.vertex, c.tasknum, c.attemptnum),
+ style="text-anchor: middle; font-size: 9px;",
+ )
+ else:
+ svg.text(
+ (x1 + x2) / 2,
+ y2 - 12,
+ "%s" % c.vertex,
+ style="text-anchor: middle; font-size: 9px;",
+ )
+ finishes = sorted([c.finish for c in dag.attempts()])
+ if len(finishes) > 10 and fraction > 0:
+ percentX = finishes[int(len(finishes) * fraction)]
+ svg.line(
+ marginRight + xdomain(percentX),
+ marginTop,
+ marginRight + xdomain(percentX),
+ y + marginTop,
+ style="stroke: red",
+ )
+ svg.text(
+ marginRight + xdomain(percentX),
+ y + marginTop + 12,
+ "%d%% (%0.1fs)"
+ % (int(fraction * 100), (percentX - dag.start) / 1000.0),
+ style="font-size:12px; text-anchor: middle",
+ )
+
+ out.write(svg.flush())
+
+ # Do not close sys.stdout as it causes print() to fail afterwards
+ if out is not sys.stdout:
+ out.close()
+
+ print("Output svg is written into: " + str(out_filename))
+
if __name__ == "__main__":
- sys.exit(main(sys.argv[1:]))
+ sys.exit(main(sys.argv[1:]))
diff --git a/tez-tools/swimlanes/yarn-swimlanes.sh
b/tez-tools/swimlanes/yarn-swimlanes.sh
index 02465b012..216c75733 100755
--- a/tez-tools/swimlanes/yarn-swimlanes.sh
+++ b/tez-tools/swimlanes/yarn-swimlanes.sh
@@ -24,7 +24,7 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1
&& pwd )"
if [[ -f $APPID ]]; then
echo "Reading yarn logs from local file: $APPID"
- cat "$APPID" | grep HISTORY > "$TMP"
+ grep "HISTORY" "$APPID" > "$TMP"
else
YARN=$(which yarn);
echo "Fetching yarn logs for $APPID"
@@ -32,4 +32,4 @@ else
fi
echo "History was written into $TMP"
-python "$DIR/swimlane.py" -o "$APPID.svg" "$TMP"
\ No newline at end of file
+python3 "$DIR/swimlane.py" -o "$APPID.svg" "$TMP"
diff --git a/tez-tools/tez-log-split/logsplit.py
b/tez-tools/tez-log-split/logsplit.py
index 47e17da11..063b7fd90 100644
--- a/tez-tools/tez-log-split/logsplit.py
+++ b/tez-tools/tez-log-split/logsplit.py
@@ -23,6 +23,7 @@ import re
from gzip import GzipFile as GZFile
from getopt import getopt
+
def usage():
sys.stderr.write("""
usage: logsplit.py <log-file>
@@ -30,20 +31,24 @@ usage: logsplit.py <log-file>
Input files for this tool can be prepared by "yarn logs -applicationId
<application_...>".
""")
+
def open_file(f):
if f.endswith(".gz"):
return GZFile(f)
return open(f)
+
class AggregatedLog(object):
def __init__(self):
self.in_container = False
self.in_logfile = False
self.current_container_header = None
self.current_container_name = None
- self.current_host_name = None # as read from log line:
"hello.my.host.com_8041"
+ self.current_host_name = None # as read from log line:
"hello.my.host.com_8041"
self.current_file = None
- self.HEADER_CONTAINER_RE = re.compile("Container:
(container_[a-z0-9_]+) on (.*)")
+ self.HEADER_CONTAINER_RE = re.compile(
+ "Container: (container_[a-z0-9_]+) on (.*)"
+ )
self.HEADER_LAST_ROW_RE = re.compile("^LogContents:$")
self.HEADER_LOG_TYPE_RE = re.compile("^LogType:(.*)")
self.LAST_LOG_LINE_RE = re.compile("^End of LogType:.*")
@@ -72,7 +77,9 @@ class AggregatedLog(object):
self.create_file_in_current_container(file_name)
elif self.HEADER_LAST_ROW_RE.match(line):
self.in_logfile = True
- self.write_to_current_file(self.current_container_header)
#for host reference
+ self.write_to_current_file(
+ self.current_container_header
+ ) # for host reference
else:
m = self.HEADER_CONTAINER_RE.match(line)
self.current_container_header = line
@@ -83,12 +90,16 @@ class AggregatedLog(object):
self.start_container_folder()
def start_container_folder(self):
- container_dir = os.path.join(self.output_folder,
self.get_current_container_dir_name())
+ container_dir = os.path.join(
+ self.output_folder, self.get_current_container_dir_name()
+ )
if not os.path.exists(container_dir):
os.makedirs(container_dir)
def create_file_in_current_container(self, file_name):
- file_to_be_created = os.path.join(self.output_folder,
self.get_current_container_dir_name(), file_name)
+ file_to_be_created = os.path.join(
+ self.output_folder, self.get_current_container_dir_name(),
file_name
+ )
file = open(file_to_be_created, "w+")
self.current_file = file
@@ -98,14 +109,18 @@ class AggregatedLog(object):
def get_current_container_dir_name(self):
return os.path.join(self.current_host_name,
self.current_container_name)
+
def main(argv):
(opts, args) = getopt(argv, "")
input_file = args[0]
fp = open_file(input_file)
aggregated_log = AggregatedLog()
aggregated_log.process(fp)
- print ("Split application logs was written into folder " +
aggregated_log.output_folder)
+ print(
+ "Split application logs was written into folder " +
aggregated_log.output_folder
+ )
fp.close()
+
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
diff --git a/tez-tools/tez-log-split/tez-log-splitter.sh
b/tez-tools/tez-log-split/tez-log-splitter.sh
index 712e499a4..cc102ab72 100755
--- a/tez-tools/tez-log-split/tez-log-splitter.sh
+++ b/tez-tools/tez-log-split/tez-log-splitter.sh
@@ -32,4 +32,4 @@ else
echo "Application log was written into $TMP"
fi
-python "$DIR/logsplit.py" "$TMP"
\ No newline at end of file
+python3 "$DIR/logsplit.py" "$TMP"