Updated Branches: refs/heads/trunk 171c661e7 -> 7ca60b464
add session information to tracing output patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4862 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7ca60b46 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7ca60b46 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7ca60b46 Branch: refs/heads/trunk Commit: 7ca60b464651af3a3ab47b3449594638d8bec83f Parents: 171c661 Author: Jonathan Ellis <[email protected]> Authored: Tue Oct 30 09:38:06 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Tue Oct 30 09:38:06 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + bin/cqlsh | 40 ++------- pylib/cqlshlib/tracing.py | 71 +++++++++++++++ .../org/apache/cassandra/config/CFMetaData.java | 4 +- src/java/org/apache/cassandra/tracing/Tracing.java | 42 +++------ .../apache/cassandra/tracing/TracingAppender.java | 2 +- 6 files changed, 98 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ca60b46/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bffd9ec..2b87c85 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-beta2 + * improve tracing output (CASSANDRA-4852, 4862) * make TRACE verb droppable (CASSANDRA-4672) * fix BulkLoader recognition of CQL3 columnfamilies (CASSANDRA-4755) * Sort commitlog segments for replay by id instead of mtime (CASSANDRA-4793) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ca60b46/bin/cqlsh ---------------------------------------------------------------------- diff --git a/bin/cqlsh b/bin/cqlsh index 8171bb4..586004b 100755 --- a/bin/cqlsh +++ b/bin/cqlsh @@ -106,10 +106,11 @@ if os.path.isdir(cqlshlibdir): sys.path.insert(0, cqlshlibdir) from cqlshlib import cqlhandling, cql3handling, pylexotron -from cqlshlib.displaying import (MAGENTA, RED, BLUE, ANSI_RESET, COLUMN_NAME_COLORS, +from cqlshlib.displaying import (RED, BLUE, ANSI_RESET, COLUMN_NAME_COLORS, FormattedValue, colorme) from cqlshlib.formatting import format_by_type from cqlshlib.util import trim_if_present +from cqlshlib.tracing import print_trace_session CONFIG_FILE = os.path.expanduser(os.path.join('~', '.cqlshrc')) HISTORY = os.path.expanduser(os.path.join('~', '.cqlsh_history')) @@ -974,7 +975,7 @@ class Shell(cmd.Cmd): session_id = UUID(bytes=self.trace_next_query()) result = self.perform_statement_untraced(statement, decoder=None) time.sleep(0.5) # trace writes are async so we wait a little. - self.print_trace(session_id) + print_trace_session(self, self.cursor, session_id) return result else: return self.perform_statement_untraced(statement, decoder=None) @@ -1020,15 +1021,6 @@ class Shell(cmd.Cmd): self.flush_output() return True - def print_trace(self, session_id): - self.writeresult('Tracing session: ', color=MAGENTA, newline=False) - self.writeresult(session_id) - self.writeresult('') - self.perform_statement_untraced("SELECT activity, event_id, source, source_elapsed " - "FROM system_traces.events " - "WHERE session_id = '%s'" % (session_id,), - decoder=TraceSchemaDecoder) - # these next two functions are not guaranteed perfect; just checks if the # statement parses fully according to cqlsh's own understanding of the # grammar. Changes to the language in Cassandra frequently don't get @@ -1082,6 +1074,7 @@ class Shell(cmd.Cmd): def print_result(self, cursor): self.decoding_errors = [] + self.writeresult("") if self.has_static_result_set(cursor): self.print_static_result(cursor) else: @@ -1099,11 +1092,13 @@ class Shell(cmd.Cmd): colnames = [d[0] for d in cursor.description] colnames_t = [(name, self.get_nametype(cursor, n)) for (n, name) in enumerate(colnames)] formatted_names = [self.myformat_colname(name, nametype) for (name, nametype) in colnames_t] - formatted_data = [map(self.myformat_value, row, cursor.column_types) for row in cursor] + formatted_values = [map(self.myformat_value, row, cursor.column_types) for row in cursor] + self.print_formatted_result(formatted_names, formatted_values) + def print_formatted_result(self, formatted_names, formatted_values): # determine column widths widths = [n.displaywidth for n in formatted_names] - for fmtrow in formatted_data: + for fmtrow in formatted_values: for num, col in enumerate(fmtrow): widths[num] = max(widths[num], col.displaywidth) @@ -1113,7 +1108,7 @@ class Shell(cmd.Cmd): self.writeresult('-%s-' % '-+-'.join('-' * w for w in widths)) # print row data - for row in formatted_data: + for row in formatted_values: line = ' | '.join(col.color_rjust(w) for (col, w) in zip(row, widths)) self.writeresult(' ' + line) @@ -2647,23 +2642,6 @@ class ErrorHandlingSchemaDecoder(OverrideableSchemaDecoder): def value_decode_error(self, err, namebytes, valuebytes, expectedtype): return DecodeError(valuebytes, err, expectedtype, colname=namebytes) -class TraceSchemaDecoder(cql.decoders.SchemaDecoder): - def __init__(self, schema): - cql.decoders.SchemaDecoder.__init__(self, schema) - - def decode_metadata_and_type(self, namebytes): - # override event_id columnname and type. - if namebytes == 'event_id': - return u'timestamp', 'timestamp', cql.cqltypes.UTF8Type, cql.cqltypes.UTF8Type - return cql.decoders.SchemaDecoder.decode_metadata_and_type(self, namebytes) - - def decode_value(self, valbytes, vtype, colname): - if colname == 'timestamp': - millis = (UUID(bytes=valbytes).get_time() - 0x01b21dd213814000) / 10000 - s, ms = divmod(millis, 1000) - return time.strftime('%H:%M:%S', time.localtime(s)) + ',' + str(ms).rjust(3, '0') - return cql.decoders.SchemaDecoder.decode_value(self, valbytes, vtype, colname) - def option_with_default(cparser_getter, section, option, default=None): try: return cparser_getter(section, option) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ca60b46/pylib/cqlshlib/tracing.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/tracing.py b/pylib/cqlshlib/tracing.py new file mode 100644 index 0000000..5897c27 --- /dev/null +++ b/pylib/cqlshlib/tracing.py @@ -0,0 +1,71 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from cql.cqltypes import UTF8Type, InetAddressType, Int32Type +from cqlshlib.displaying import MAGENTA + +TRACING_KS = 'system_traces' +SESSIONS_CF = 'sessions' +EVENTS_CF = 'events' + +def print_trace_session(shell, cursor, session_id): + rows = fetch_trace_session(cursor, session_id) + names = ['activity', 'timestamp', 'source', 'source_elapsed'] + types = [UTF8Type, UTF8Type, InetAddressType, Int32Type] + + formatted_names = [shell.myformat_colname(name, UTF8Type) for name in names] + formatted_values = [map(shell.myformat_value, row, types) for row in rows] + + shell.writeresult('') + shell.writeresult('Tracing session: ', color=MAGENTA, newline=False) + shell.writeresult(session_id) + shell.writeresult('') + shell.print_formatted_result(formatted_names, formatted_values) + shell.writeresult('') + +def fetch_trace_session(cursor, session_id): + cursor.execute("SELECT request, coordinator, started_at, duration " + "FROM %s.%s " + "WHERE session_id = '%s'" % (TRACING_KS, SESSIONS_CF, session_id)) + (request, coordinator, started_at, duration) = cursor.fetchone() + + cursor.execute("SELECT activity, event_id, source, source_elapsed " + "FROM %s.%s " + "WHERE session_id = '%s'" % (TRACING_KS, EVENTS_CF, session_id)) + events = cursor.fetchall() + + rows = [] + # append header row (from sessions table). + rows.append([request, format_timestamp(started_at), coordinator, 0]) + # append main rows (from events table). + for activity, event_id, source, source_elapsed in events: + rows.append([activity, format_timeuuid(event_id), source, source_elapsed]) + # append footer row (from sessions table). + finished_at = started_at + (duration / 1000000.) + rows.append(['Request complete', format_timestamp(finished_at), coordinator, duration]) + + return rows + +def format_timestamp(value): + return format_time(int(value * 1000)) + +def format_timeuuid(value): + return format_time((value.get_time() - 0x01b21dd213814000) / 10000) + +def format_time(millis): + s, ms = divmod(millis, 1000) + return time.strftime('%H:%M:%S', time.localtime(s)) + ',' + str(ms).rjust(3, '0') http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ca60b46/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 1092da2..4ea683e 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -187,8 +187,8 @@ public final class CFMetaData + " coordinator inet," + " request text," + " started_at timestamp," - + " finished_at timestamp," - + " parameters map<text, text>" + + " parameters map<text, text>," + + " duration int" + ") WITH COMMENT='traced sessions'", Tracing.TRACE_KS); public static final CFMetaData TraceEventsCf = compile(15, "CREATE TABLE " + Tracing.EVENTS_CF + " (" http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ca60b46/src/java/org/apache/cassandra/tracing/Tracing.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java index f2f8869..7e639df 100644 --- a/src/java/org/apache/cassandra/tracing/Tracing.java +++ b/src/java/org/apache/cassandra/tracing/Tracing.java @@ -38,10 +38,7 @@ import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.ExpiringColumn; import org.apache.cassandra.db.RowMutation; -import org.apache.cassandra.db.marshal.InetAddressType; -import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.db.marshal.TimeUUIDType; -import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.utils.ByteBufferUtil; @@ -82,32 +79,27 @@ public class Tracing private final Map<UUID, TraceState> sessions = new ConcurrentHashMap<UUID, TraceState>(); - public static void addColumn(ColumnFamily cf, ByteBuffer name, Object value) - { - cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(value.toString()), System.currentTimeMillis(), TTL)); - } - public static void addColumn(ColumnFamily cf, ByteBuffer name, InetAddress address) { - cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(address), System.currentTimeMillis(), TTL)); + addColumn(cf, name, ByteBufferUtil.bytes(address)); } public static void addColumn(ColumnFamily cf, ByteBuffer name, int value) { - cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(value), System.currentTimeMillis(), TTL)); + addColumn(cf, name, ByteBufferUtil.bytes(value)); } public static void addColumn(ColumnFamily cf, ByteBuffer name, long value) { - cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(value), System.currentTimeMillis(), TTL)); + addColumn(cf, name, ByteBufferUtil.bytes(value)); } public static void addColumn(ColumnFamily cf, ByteBuffer name, String value) { - cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(value), System.currentTimeMillis(), TTL)); + addColumn(cf, name, ByteBufferUtil.bytes(value)); } - private void addColumn(ColumnFamily cf, ByteBuffer name, ByteBuffer value) + private static void addColumn(ColumnFamily cf, ByteBuffer name, ByteBuffer value) { cf.addColumn(new ExpiringColumn(name, value, System.currentTimeMillis(), TTL)); } @@ -185,17 +177,16 @@ public class Tracing } else { - final long finished_at = System.currentTimeMillis(); + final int elapsed = state.elapsed(); final ByteBuffer sessionIdBytes = state.sessionIdBytes; StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() { public void runMayThrow() throws Exception { - ColumnFamily cf = ColumnFamily.create(CFMetaData.TraceSessionsCf); - addColumn(cf, - buildName(CFMetaData.TraceSessionsCf, bytes("finished_at")), - LongType.instance.decompose(finished_at)); + CFMetaData cfMeta = CFMetaData.TraceSessionsCf; + ColumnFamily cf = ColumnFamily.create(cfMeta); + addColumn(cf, buildName(cfMeta, bytes("duration")), elapsed); RowMutation mutation = new RowMutation(TRACE_KS, sessionIdBytes); mutation.add(cf); StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY); @@ -228,16 +219,11 @@ public class Tracing { public void runMayThrow() throws Exception { - ColumnFamily cf = ColumnFamily.create(CFMetaData.TraceSessionsCf); - addColumn(cf, - buildName(CFMetaData.TraceSessionsCf, bytes("coordinator")), - InetAddressType.instance.decompose(FBUtilities.getBroadcastAddress())); - addColumn(cf, - buildName(CFMetaData.TraceSessionsCf, bytes("request")), - UTF8Type.instance.decompose(request)); - addColumn(cf, - buildName(CFMetaData.TraceSessionsCf, bytes("started_at")), - LongType.instance.decompose(started_at)); + CFMetaData cfMeta = CFMetaData.TraceSessionsCf; + ColumnFamily cf = ColumnFamily.create(cfMeta); + addColumn(cf, buildName(cfMeta, bytes("coordinator")), FBUtilities.getBroadcastAddress()); + addColumn(cf, buildName(cfMeta, bytes("request")), request); + addColumn(cf, buildName(cfMeta, bytes("started_at")), started_at); addParameterColumns(cf, parameters); RowMutation mutation = new RowMutation(TRACE_KS, sessionIdBytes); mutation.add(cf); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ca60b46/src/java/org/apache/cassandra/tracing/TracingAppender.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TracingAppender.java b/src/java/org/apache/cassandra/tracing/TracingAppender.java index e971389..4d4a9a0 100644 --- a/src/java/org/apache/cassandra/tracing/TracingAppender.java +++ b/src/java/org/apache/cassandra/tracing/TracingAppender.java @@ -59,7 +59,7 @@ public class TracingAppender extends AppenderSkeleton addColumn(cf, buildName(cfMeta, eventId, bytes("source")), FBUtilities.getBroadcastAddress()); addColumn(cf, buildName(cfMeta, eventId, bytes("thread")), event.getThreadName()); addColumn(cf, buildName(cfMeta, eventId, bytes("source_elapsed")), elapsed); - addColumn(cf, buildName(cfMeta, eventId, bytes("activity")), event.getMessage()); + addColumn(cf, buildName(cfMeta, eventId, bytes("activity")), event.getMessage().toString()); RowMutation mutation = new RowMutation(Tracing.TRACE_KS, state.sessionIdBytes); mutation.add(cf); StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY);
