Updated Branches:
  refs/heads/trunk 171c661e7 -> 7ca60b464

add session information to tracing output
patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4862


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7ca60b46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7ca60b46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7ca60b46

Branch: refs/heads/trunk
Commit: 7ca60b464651af3a3ab47b3449594638d8bec83f
Parents: 171c661
Author: Jonathan Ellis <[email protected]>
Authored: Tue Oct 30 09:38:06 2012 -0500
Committer: Jonathan Ellis <[email protected]>
Committed: Tue Oct 30 09:38:06 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 bin/cqlsh                                          |   40 ++-------
 pylib/cqlshlib/tracing.py                          |   71 +++++++++++++++
 .../org/apache/cassandra/config/CFMetaData.java    |    4 +-
 src/java/org/apache/cassandra/tracing/Tracing.java |   42 +++------
 .../apache/cassandra/tracing/TracingAppender.java  |    2 +-
 6 files changed, 98 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ca60b46/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bffd9ec..2b87c85 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-beta2
+ * improve tracing output (CASSANDRA-4852, 4862)
  * make TRACE verb droppable (CASSANDRA-4672)
  * fix BulkLoader recognition of CQL3 columnfamilies (CASSANDRA-4755)
  * Sort commitlog segments for replay by id instead of mtime (CASSANDRA-4793)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ca60b46/bin/cqlsh
----------------------------------------------------------------------
diff --git a/bin/cqlsh b/bin/cqlsh
index 8171bb4..586004b 100755
--- a/bin/cqlsh
+++ b/bin/cqlsh
@@ -106,10 +106,11 @@ if os.path.isdir(cqlshlibdir):
     sys.path.insert(0, cqlshlibdir)
 
 from cqlshlib import cqlhandling, cql3handling, pylexotron
-from cqlshlib.displaying import (MAGENTA, RED, BLUE, ANSI_RESET, 
COLUMN_NAME_COLORS,
+from cqlshlib.displaying import (RED, BLUE, ANSI_RESET, COLUMN_NAME_COLORS,
                                  FormattedValue, colorme)
 from cqlshlib.formatting import format_by_type
 from cqlshlib.util import trim_if_present
+from cqlshlib.tracing import print_trace_session
 
 CONFIG_FILE = os.path.expanduser(os.path.join('~', '.cqlshrc'))
 HISTORY = os.path.expanduser(os.path.join('~', '.cqlsh_history'))
@@ -974,7 +975,7 @@ class Shell(cmd.Cmd):
             session_id = UUID(bytes=self.trace_next_query())
             result = self.perform_statement_untraced(statement, decoder=None)
             time.sleep(0.5) # trace writes are async so we wait a little.
-            self.print_trace(session_id)
+            print_trace_session(self, self.cursor, session_id)
             return result
         else:
             return self.perform_statement_untraced(statement, decoder=None)
@@ -1020,15 +1021,6 @@ class Shell(cmd.Cmd):
         self.flush_output()
         return True
 
-    def print_trace(self, session_id):
-        self.writeresult('Tracing session: ', color=MAGENTA, newline=False)
-        self.writeresult(session_id)
-        self.writeresult('')
-        self.perform_statement_untraced("SELECT activity, event_id, source, 
source_elapsed "
-                                        "FROM system_traces.events "
-                                        "WHERE session_id = '%s'" % 
(session_id,),
-                                        decoder=TraceSchemaDecoder)
-
     # these next two functions are not guaranteed perfect; just checks if the
     # statement parses fully according to cqlsh's own understanding of the
     # grammar. Changes to the language in Cassandra frequently don't get
@@ -1082,6 +1074,7 @@ class Shell(cmd.Cmd):
     def print_result(self, cursor):
         self.decoding_errors = []
 
+        self.writeresult("")
         if self.has_static_result_set(cursor):
             self.print_static_result(cursor)
         else:
@@ -1099,11 +1092,13 @@ class Shell(cmd.Cmd):
         colnames = [d[0] for d in cursor.description]
         colnames_t = [(name, self.get_nametype(cursor, n)) for (n, name) in 
enumerate(colnames)]
         formatted_names = [self.myformat_colname(name, nametype) for (name, 
nametype) in colnames_t]
-        formatted_data = [map(self.myformat_value, row, cursor.column_types) 
for row in cursor]
+        formatted_values = [map(self.myformat_value, row, cursor.column_types) 
for row in cursor]
+        self.print_formatted_result(formatted_names, formatted_values)
 
+    def print_formatted_result(self, formatted_names, formatted_values):
         # determine column widths
         widths = [n.displaywidth for n in formatted_names]
-        for fmtrow in formatted_data:
+        for fmtrow in formatted_values:
             for num, col in enumerate(fmtrow):
                 widths[num] = max(widths[num], col.displaywidth)
 
@@ -1113,7 +1108,7 @@ class Shell(cmd.Cmd):
         self.writeresult('-%s-' % '-+-'.join('-' * w for w in widths))
 
         # print row data
-        for row in formatted_data:
+        for row in formatted_values:
             line = ' | '.join(col.color_rjust(w) for (col, w) in zip(row, 
widths))
             self.writeresult(' ' + line)
 
@@ -2647,23 +2642,6 @@ class 
ErrorHandlingSchemaDecoder(OverrideableSchemaDecoder):
     def value_decode_error(self, err, namebytes, valuebytes, expectedtype):
         return DecodeError(valuebytes, err, expectedtype, colname=namebytes)
 
-class TraceSchemaDecoder(cql.decoders.SchemaDecoder):
-    def __init__(self, schema):
-        cql.decoders.SchemaDecoder.__init__(self, schema)
-
-    def decode_metadata_and_type(self, namebytes):
-        # override event_id columnname and type.
-        if namebytes == 'event_id':
-            return u'timestamp', 'timestamp', cql.cqltypes.UTF8Type, 
cql.cqltypes.UTF8Type
-        return cql.decoders.SchemaDecoder.decode_metadata_and_type(self, 
namebytes)
-
-    def decode_value(self, valbytes, vtype, colname):
-        if colname == 'timestamp':
-            millis = (UUID(bytes=valbytes).get_time() - 0x01b21dd213814000) / 
10000
-            s, ms = divmod(millis, 1000)
-            return time.strftime('%H:%M:%S', time.localtime(s)) + ',' + 
str(ms).rjust(3, '0')
-        return cql.decoders.SchemaDecoder.decode_value(self, valbytes, vtype, 
colname)
-
 def option_with_default(cparser_getter, section, option, default=None):
     try:
         return cparser_getter(section, option)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ca60b46/pylib/cqlshlib/tracing.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/tracing.py b/pylib/cqlshlib/tracing.py
new file mode 100644
index 0000000..5897c27
--- /dev/null
+++ b/pylib/cqlshlib/tracing.py
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+from cql.cqltypes import UTF8Type, InetAddressType, Int32Type
+from cqlshlib.displaying import MAGENTA
+
+TRACING_KS = 'system_traces'
+SESSIONS_CF = 'sessions'
+EVENTS_CF = 'events'
+
+def print_trace_session(shell, cursor, session_id):
+    rows  = fetch_trace_session(cursor, session_id)
+    names = ['activity', 'timestamp', 'source', 'source_elapsed']
+    types = [UTF8Type, UTF8Type, InetAddressType, Int32Type]
+
+    formatted_names = [shell.myformat_colname(name, UTF8Type) for name in 
names]
+    formatted_values = [map(shell.myformat_value, row, types) for row in rows]
+
+    shell.writeresult('')
+    shell.writeresult('Tracing session: ', color=MAGENTA, newline=False)
+    shell.writeresult(session_id)
+    shell.writeresult('')
+    shell.print_formatted_result(formatted_names, formatted_values)
+    shell.writeresult('')
+
+def fetch_trace_session(cursor, session_id):
+    cursor.execute("SELECT request, coordinator, started_at, duration "
+                   "FROM %s.%s "
+                   "WHERE session_id = '%s'" % (TRACING_KS, SESSIONS_CF, 
session_id))
+    (request, coordinator, started_at, duration) = cursor.fetchone()
+
+    cursor.execute("SELECT activity, event_id, source, source_elapsed "
+                   "FROM %s.%s "
+                   "WHERE session_id = '%s'" % (TRACING_KS, EVENTS_CF, 
session_id))
+    events = cursor.fetchall()
+
+    rows = []
+    # append header row (from sessions table).
+    rows.append([request, format_timestamp(started_at), coordinator, 0])
+    # append main rows (from events table).
+    for activity, event_id, source, source_elapsed in events:
+        rows.append([activity, format_timeuuid(event_id), source, 
source_elapsed])
+    # append footer row (from sessions table).
+    finished_at = started_at + (duration / 1000000.)
+    rows.append(['Request complete', format_timestamp(finished_at), 
coordinator, duration])
+
+    return rows
+
+def format_timestamp(value):
+    return format_time(int(value * 1000))
+
+def format_timeuuid(value):
+    return format_time((value.get_time() - 0x01b21dd213814000) / 10000)
+
+def format_time(millis):
+    s, ms = divmod(millis, 1000)
+    return time.strftime('%H:%M:%S', time.localtime(s)) + ',' + 
str(ms).rjust(3, '0')

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ca60b46/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java 
b/src/java/org/apache/cassandra/config/CFMetaData.java
index 1092da2..4ea683e 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -187,8 +187,8 @@ public final class CFMetaData
                                                                + "  
coordinator inet,"
                                                                + "  request 
text,"
                                                                + "  started_at 
timestamp,"
-                                                               + "  
finished_at timestamp,"
-                                                               + "  parameters 
map<text, text>"
+                                                               + "  parameters 
map<text, text>,"
+                                                               + "  duration 
int"
                                                                + ") WITH 
COMMENT='traced sessions'", Tracing.TRACE_KS);
 
     public static final CFMetaData TraceEventsCf = compile(15, "CREATE TABLE " 
+ Tracing.EVENTS_CF + " ("

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ca60b46/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java 
b/src/java/org/apache/cassandra/tracing/Tracing.java
index f2f8869..7e639df 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -38,10 +38,7 @@ import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.ExpiringColumn;
 import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.marshal.InetAddressType;
-import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
-import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -82,32 +79,27 @@ public class Tracing
 
     private final Map<UUID, TraceState> sessions = new ConcurrentHashMap<UUID, 
TraceState>();
 
-    public static void addColumn(ColumnFamily cf, ByteBuffer name, Object 
value)
-    {
-        cf.addColumn(new ExpiringColumn(name, 
ByteBufferUtil.bytes(value.toString()), System.currentTimeMillis(), TTL));
-    }
-
     public static void addColumn(ColumnFamily cf, ByteBuffer name, InetAddress 
address)
     {
-        cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(address), 
System.currentTimeMillis(), TTL));
+        addColumn(cf, name, ByteBufferUtil.bytes(address));
     }
 
     public static void addColumn(ColumnFamily cf, ByteBuffer name, int value)
     {
-        cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(value), 
System.currentTimeMillis(), TTL));
+        addColumn(cf, name, ByteBufferUtil.bytes(value));
     }
 
     public static void addColumn(ColumnFamily cf, ByteBuffer name, long value)
     {
-        cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(value), 
System.currentTimeMillis(), TTL));
+        addColumn(cf, name, ByteBufferUtil.bytes(value));
     }
 
     public static void addColumn(ColumnFamily cf, ByteBuffer name, String 
value)
     {
-        cf.addColumn(new ExpiringColumn(name, ByteBufferUtil.bytes(value), 
System.currentTimeMillis(), TTL));
+        addColumn(cf, name, ByteBufferUtil.bytes(value));
     }
 
-    private void addColumn(ColumnFamily cf, ByteBuffer name, ByteBuffer value)
+    private static void addColumn(ColumnFamily cf, ByteBuffer name, ByteBuffer 
value)
     {
         cf.addColumn(new ExpiringColumn(name, value, 
System.currentTimeMillis(), TTL));
     }
@@ -185,17 +177,16 @@ public class Tracing
         }
         else
         {
-            final long finished_at = System.currentTimeMillis();
+            final int elapsed = state.elapsed();
             final ByteBuffer sessionIdBytes = state.sessionIdBytes;
 
             StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
             {
                 public void runMayThrow() throws Exception
                 {
-                    ColumnFamily cf = 
ColumnFamily.create(CFMetaData.TraceSessionsCf);
-                    addColumn(cf,
-                              buildName(CFMetaData.TraceSessionsCf, 
bytes("finished_at")),
-                              LongType.instance.decompose(finished_at));
+                    CFMetaData cfMeta = CFMetaData.TraceSessionsCf;
+                    ColumnFamily cf = ColumnFamily.create(cfMeta);
+                    addColumn(cf, buildName(cfMeta, bytes("duration")), 
elapsed);
                     RowMutation mutation = new RowMutation(TRACE_KS, 
sessionIdBytes);
                     mutation.add(cf);
                     StorageProxy.mutate(Arrays.asList(mutation), 
ConsistencyLevel.ANY);
@@ -228,16 +219,11 @@ public class Tracing
         {
             public void runMayThrow() throws Exception
             {
-                ColumnFamily cf = 
ColumnFamily.create(CFMetaData.TraceSessionsCf);
-                addColumn(cf,
-                          buildName(CFMetaData.TraceSessionsCf, 
bytes("coordinator")),
-                          
InetAddressType.instance.decompose(FBUtilities.getBroadcastAddress()));
-                addColumn(cf,
-                          buildName(CFMetaData.TraceSessionsCf, 
bytes("request")),
-                          UTF8Type.instance.decompose(request));
-                addColumn(cf,
-                          buildName(CFMetaData.TraceSessionsCf, 
bytes("started_at")),
-                          LongType.instance.decompose(started_at));
+                CFMetaData cfMeta = CFMetaData.TraceSessionsCf;
+                ColumnFamily cf = ColumnFamily.create(cfMeta);
+                addColumn(cf, buildName(cfMeta, bytes("coordinator")), 
FBUtilities.getBroadcastAddress());
+                addColumn(cf, buildName(cfMeta, bytes("request")), request);
+                addColumn(cf, buildName(cfMeta, bytes("started_at")), 
started_at);
                 addParameterColumns(cf, parameters);
                 RowMutation mutation = new RowMutation(TRACE_KS, 
sessionIdBytes);
                 mutation.add(cf);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ca60b46/src/java/org/apache/cassandra/tracing/TracingAppender.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TracingAppender.java 
b/src/java/org/apache/cassandra/tracing/TracingAppender.java
index e971389..4d4a9a0 100644
--- a/src/java/org/apache/cassandra/tracing/TracingAppender.java
+++ b/src/java/org/apache/cassandra/tracing/TracingAppender.java
@@ -59,7 +59,7 @@ public class TracingAppender extends AppenderSkeleton
                 addColumn(cf, buildName(cfMeta, eventId, bytes("source")), 
FBUtilities.getBroadcastAddress());
                 addColumn(cf, buildName(cfMeta, eventId, bytes("thread")), 
event.getThreadName());
                 addColumn(cf, buildName(cfMeta, eventId, 
bytes("source_elapsed")), elapsed);
-                addColumn(cf, buildName(cfMeta, eventId, bytes("activity")), 
event.getMessage());
+                addColumn(cf, buildName(cfMeta, eventId, bytes("activity")), 
event.getMessage().toString());
                 RowMutation mutation = new RowMutation(Tracing.TRACE_KS, 
state.sessionIdBytes);
                 mutation.add(cf);
                 StorageProxy.mutate(Arrays.asList(mutation), 
ConsistencyLevel.ANY);

Reply via email to