Updated Branches: refs/heads/trunk d564a2f02 -> d7ff10d82
Separate tracing from Log4J patch by slebresne; reviewed by jbellis for CASSANDRA-4861 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d7ff10d8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d7ff10d8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d7ff10d8 Branch: refs/heads/trunk Commit: d7ff10d8293529b4f30e462ffd08aae5aff4bc97 Parents: d564a2f Author: Sylvain Lebresne <[email protected]> Authored: Thu Nov 8 09:18:57 2012 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Thu Nov 8 09:18:57 2012 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/log4j-server.properties | 10 +-- .../concurrent/DebuggableThreadPoolExecutor.java | 3 - .../org/apache/cassandra/cql3/QueryProcessor.java | 6 + .../apache/cassandra/db/CollationController.java | 17 ++-- .../org/apache/cassandra/db/ColumnFamilyStore.java | 25 ++--- .../org/apache/cassandra/db/ReadVerbHandler.java | 2 +- .../cassandra/db/RowMutationVerbHandler.java | 3 +- src/java/org/apache/cassandra/db/Table.java | 6 +- .../apache/cassandra/db/TruncateVerbHandler.java | 7 +- .../cassandra/db/filter/SliceQueryFilter.java | 3 +- .../apache/cassandra/io/sstable/SSTableReader.java | 11 +- .../org/apache/cassandra/net/MessagingService.java | 2 +- .../cassandra/net/OutboundTcpConnection.java | 7 +- .../apache/cassandra/net/ResponseVerbHandler.java | 10 ++- .../cassandra/service/IndexScanVerbHandler.java | 3 +- .../cassandra/service/RangeSliceVerbHandler.java | 2 +- .../cassandra/service/SnapshotVerbHandler.java | 3 +- .../org/apache/cassandra/service/StorageProxy.java | 12 ++- .../apache/cassandra/thrift/ThriftClientState.java | 1 - .../org/apache/cassandra/tracing/TraceState.java | 51 +++++++++- src/java/org/apache/cassandra/tracing/Tracing.java | 66 +++++++++--- .../apache/cassandra/tracing/TracingAppender.java | 79 --------------- 23 files changed, 170 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1544a7a..1af4353 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.2-rc1 * Don't share slice query filter in CQL3 SelectStatement (CASSANDRA-4928) + * Separate tracing from Log4J (CASSANDRA-4861) 1.2-beta2 * fp rate of 1.0 disables BF entirely; LCS defaults to 1.0 (CASSANDRA-4876) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/conf/log4j-server.properties ---------------------------------------------------------------------- diff --git a/conf/log4j-server.properties b/conf/log4j-server.properties index 6c9d15a..086306e 100644 --- a/conf/log4j-server.properties +++ b/conf/log4j-server.properties @@ -18,13 +18,12 @@ # (%l is slower.) # output messages into a rolling log file as well as stdout -log4j.rootLogger=DEBUG,stdout,R,tracing +log4j.rootLogger=INFO,stdout,R # stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n -log4j.appender.stdout.Threshold=INFO # rolling log file log4j.appender.R=org.apache.log4j.RollingFileAppender @@ -34,13 +33,6 @@ log4j.appender.R.layout=org.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n # Edit the next line to point to your logs directory log4j.appender.R.File=/var/log/cassandra/system.log -log4j.appender.R.Threshold=INFO - -log4j.appender.tracing=org.apache.cassandra.tracing.TracingAppender -log4j.appender.tracing.layout=org.apache.log4j.PatternLayout -log4j.appender.tracing.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n -log4j.appender.tracing.Threshold=DEBUG - # Application logging options #log4j.logger.org.apache.cassandra=DEBUG http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java index 9aff0bf..25f15ee 100644 --- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java +++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java @@ -180,10 +180,7 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor protected void beforeExecute(Thread t, Runnable r) { if (r instanceof TraceSessionWrapper) - { - logger.debug("executing {}", r); ((TraceSessionWrapper) r).setupContext(); - } super.beforeExecute(t, r); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 3342a2c..02b5edc 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -34,6 +34,7 @@ import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MD5Digest; import org.apache.cassandra.utils.SemanticVersion; @@ -126,8 +127,11 @@ public class QueryProcessor throws RequestExecutionException, RequestValidationException { ClientState clientState = queryState.getClientState(); + Tracing.trace("Checking access"); statement.checkAccess(clientState); + Tracing.trace("Validating statement"); statement.validate(clientState); + Tracing.trace("Executing statement"); ResultMessage result = statement.execute(cl, queryState, variables); return result == null ? new ResultMessage.Void() : result; } @@ -236,12 +240,14 @@ public class QueryProcessor private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState) throws RequestValidationException { + Tracing.trace("Parsing statement"); ParsedStatement statement = parseStatement(queryStr); // Set keyspace for statement that require login if (statement instanceof CFStatement) ((CFStatement)statement).prepareKeyspace(clientState); + Tracing.trace("Peparing statement"); return statement.prepare(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/db/CollationController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java index 21c3a2f..80d0a2c 100644 --- a/src/java/org/apache/cassandra/db/CollationController.java +++ b/src/java/org/apache/cassandra/db/CollationController.java @@ -34,6 +34,7 @@ import org.apache.cassandra.db.marshal.CounterColumnType; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.CloseableIterator; public class CollationController @@ -79,11 +80,11 @@ public class CollationController : TreeMapBackedSortedColumns.factory(); ColumnFamily container = ColumnFamily.create(cfs.metadata, factory, filter.filter.isReversed()); List<OnDiskAtomIterator> iterators = new ArrayList<OnDiskAtomIterator>(); - logger.debug("Acquiring sstable references"); + Tracing.trace("Acquiring sstable references"); ColumnFamilyStore.ViewFragment view = cfs.markReferenced(filter.key); try { - logger.debug("Merging memtable contents"); + Tracing.trace("Merging memtable contents"); for (Memtable memtable : view.memtables) { OnDiskAtomIterator iter = filter.getMemtableColumnIterator(memtable); @@ -132,7 +133,7 @@ public class CollationController container.delete(cf); sstablesIterated++; - logger.debug("Merging data from sstable {}", sstable.descriptor.generation); + Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation); while (iter.hasNext()) container.addAtom(iter.next()); } @@ -165,7 +166,7 @@ public class CollationController } }; ColumnFamily returnCF = container.cloneMeShallow(); - logger.debug("Final collate"); + Tracing.trace("Collating all results"); filter.collateOnDiskAtom(returnCF, Collections.singletonList(toCollate), gcBefore); // "hoist up" the requested data into a more recent sstable @@ -173,7 +174,7 @@ public class CollationController && !cfs.isCompactionDisabled() && cfs.getCompactionStrategy() instanceof SizeTieredCompactionStrategy) { - logger.debug("Defragmenting requested data"); + Tracing.trace("Defragmenting requested data"); RowMutation rm = new RowMutation(cfs.table.name, new Row(filter.key, returnCF.cloneMe())); // skipping commitlog and index updates is fine since we're just de-fragmenting existing data Table.open(rm.getTable()).apply(rm, false, false); @@ -222,14 +223,14 @@ public class CollationController ISortedColumns.Factory factory = mutableColumns ? cfs.metadata.cfType == ColumnFamilyType.Super ? ThreadSafeSortedColumns.factory() : AtomicSortedColumns.factory() : ArrayBackedSortedColumns.factory(); - logger.debug("Acquiring sstable references"); + Tracing.trace("Acquiring sstable references"); ColumnFamilyStore.ViewFragment view = cfs.markReferenced(filter.key); List<OnDiskAtomIterator> iterators = new ArrayList<OnDiskAtomIterator>(Iterables.size(view.memtables) + view.sstables.size()); ColumnFamily returnCF = ColumnFamily.create(cfs.metadata, factory, filter.filter.isReversed()); try { - logger.debug("Merging memtable contents"); + Tracing.trace("Merging memtable contents"); for (Memtable memtable : view.memtables) { OnDiskAtomIterator iter = filter.getMemtableColumnIterator(memtable); @@ -283,7 +284,7 @@ public class CollationController if (iterators.isEmpty()) return null; - logger.debug("Merging data from {} memtables and sstables", iterators.size()); + Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated); filter.collateOnDiskAtom(returnCF, iterators, gcBefore); // Caller is responsible for final removeDeletedCF. This is important for cacheRow to work correctly: http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 4664760..382a7f5 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -71,6 +71,7 @@ import org.apache.cassandra.metrics.ColumnFamilyMetrics; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.IndexExpression; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.*; import static org.apache.cassandra.config.CFMetaData.Caching; @@ -1159,14 +1160,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (cached instanceof RowCacheSentinel) { // Some other read is trying to cache the value, just do a normal non-caching read - logger.debug("Row cache miss (race)"); + Tracing.trace("Row cache miss (race)"); return getTopLevelColumns(filter, Integer.MIN_VALUE, false); } - logger.debug("Row cache hit"); + Tracing.trace("Row cache hit"); return (ColumnFamily) cached; } - logger.debug("Row cache miss"); + Tracing.trace("Row cache miss"); RowCacheSentinel sentinel = new RowCacheSentinel(); boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel); @@ -1354,7 +1355,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore, boolean forCache) { - logger.debug("Executing single-partition query on {}", columnFamily); + Tracing.trace("Executing single-partition query on {}", columnFamily); CollationController controller = new CollationController(this, forCache, filter, gcBefore); ColumnFamily columns = controller.getTopLevelColumns(); metric.updateSSTableIterated(controller.getSstablesIterated()); @@ -1386,9 +1387,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean QueryFilter filter = new QueryFilter(null, new QueryPath(columnFamily, superColumn, null), columnFilter); final ViewFragment view = markReferenced(startWith, stopAt); - if (logger.isDebugEnabled()) - logger.debug(String.format("Executing seq scan across %s sstables for %s..%s", - view.sstables.size(), startWith, stopAt)); + Tracing.trace("Executing seq scan across {} sstables for {}..{}", new Object[]{view.sstables.size(), startWith, stopAt}); try { @@ -1454,7 +1453,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean maxIsColumns) { - logger.debug("Executing indexed scan for {}..{}", range.left, range.right); + Tracing.trace("Executing indexed scan for {}..{}", range.left, range.right); return indexManager.search(clause, range, maxResults, dataFilter, maxIsColumns); } @@ -1465,12 +1464,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean int columnsCount = 0; int total = 0, matched = 0; - // disable tracing for the actual scan; otherwise it will log a full trace for each row fetched - // -- since index only contains the key, we have to do a "join" on the main table to get the actual data. - // this could be useful but it obscures the most important information which is the scanned:matched ratio. - TraceState state = Tracing.instance().get(); - Tracing.instance().set(null); - try { while (rowIterator.hasNext() && rows.size() < filter.maxRows() && columnsCount < filter.maxColumns()) @@ -1515,9 +1508,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean try { rowIterator.close(); - // re-enable tracing - Tracing.instance().set(state); - logger.debug("Scanned {} rows and matched {}", total, matched); + Tracing.trace("Scanned {} rows and matched {}", total, matched); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/db/ReadVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadVerbHandler.java b/src/java/org/apache/cassandra/db/ReadVerbHandler.java index d4af0be..fbd4f9b 100644 --- a/src/java/org/apache/cassandra/db/ReadVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadVerbHandler.java @@ -50,7 +50,7 @@ public class ReadVerbHandler implements IVerbHandler<ReadCommand> MessageOut<ReadResponse> reply = new MessageOut<ReadResponse>(MessagingService.Verb.REQUEST_RESPONSE, getResponse(command, row), ReadResponse.serializer); - logger.debug("Enqueuing response to {}", message.from); + Tracing.trace("Enqueuing response to {}", message.from); MessagingService.instance().sendReply(reply, id, message.from); } catch (IOException ex) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java index 842c539..c2126f5 100644 --- a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.net.*; +import org.apache.cassandra.tracing.Tracing; public class RowMutationVerbHandler implements IVerbHandler<RowMutation> { @@ -54,7 +55,7 @@ public class RowMutationVerbHandler implements IVerbHandler<RowMutation> rm.apply(); WriteResponse response = new WriteResponse(); - logger.debug("Enqueuing response to {}", replyTo); + Tracing.trace("Enqueuing response to {}", replyTo); MessagingService.instance().sendReply(response.createMessage(), id, replyTo); } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/db/Table.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java index 942b79d..85611de 100644 --- a/src/java/org/apache/cassandra/db/Table.java +++ b/src/java/org/apache/cassandra/db/Table.java @@ -358,7 +358,7 @@ public class Table public void apply(RowMutation mutation, boolean writeCommitLog, boolean updateIndexes) { if (!mutation.getTable().equals(Tracing.TRACE_KS)) - logger.debug("Acquiring switchLock"); + Tracing.trace("Acquiring switchLock read lock"); // write the mutation to the commitlog and memtables switchLock.readLock().lock(); @@ -366,7 +366,7 @@ public class Table { if (writeCommitLog) { - logger.debug("Appending to commitlog"); + Tracing.trace("Appending to commitlog"); CommitLog.instance.add(mutation); } @@ -380,7 +380,7 @@ public class Table continue; } - logger.debug("Adding to memtable"); + Tracing.trace("Adding to {} memtable", cf.metadata().cfName); cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key, true) : SecondaryIndexManager.nullUpdater); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/db/TruncateVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java index ea9ef14..ceb732d 100644 --- a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java +++ b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java @@ -24,6 +24,7 @@ import org.apache.cassandra.io.FSError; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.tracing.Tracing; public class TruncateVerbHandler implements IVerbHandler<Truncation> { @@ -32,7 +33,7 @@ public class TruncateVerbHandler implements IVerbHandler<Truncation> public void doVerb(MessageIn<Truncation> message, String id) { Truncation t = message.payload; - logger.debug("Applying {}", t); + Tracing.trace("Applying truncation of {}.{}", t.keyspace, t.columnFamily); try { ColumnFamilyStore cfs = Table.open(t.keyspace).getColumnFamilyStore(t.columnFamily); @@ -46,10 +47,10 @@ public class TruncateVerbHandler implements IVerbHandler<Truncation> if (FSError.findNested(e) != null) throw FSError.findNested(e); } - logger.debug("Truncate operation succeeded at this host"); + Tracing.trace("Enqueuing response to truncate operation to {}", message.from); TruncateResponse response = new TruncateResponse(t.keyspace, t.columnFamily, true); - logger.debug("{} applied. Enqueuing response to {}@{} ", new Object[]{ t, id, message.from }); + logger.trace("{} applied. Enqueuing response to {}@{} ", new Object[]{ t, id, message.from }); MessagingService.instance().sendReply(response.createMessage(), id, message.from); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java index 7d90302..7ef7977 100644 --- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java @@ -36,6 +36,7 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.tracing.Tracing; public class SliceQueryFilter implements IDiskAtomFilter { @@ -160,7 +161,7 @@ public class SliceQueryFilter implements IDiskAtomFilter container.addColumn(column); } - logger.debug("Read {} live cells and {} tombstoned", columnCounter.live(), columnCounter.ignored()); + Tracing.trace("Read {} live cells and {} tombstoned", columnCounter.live(), columnCounter.ignored()); } public int getLiveCount(ColumnFamily cf) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index bc37d67..6a9e8d2 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -48,6 +48,7 @@ import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.util.*; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.*; import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR; @@ -745,7 +746,7 @@ public class SSTableReader extends SSTable if (cachedPosition != null) { logger.trace("Cache hit for {} -> {}", cacheKey, cachedPosition); - logger.debug("Key cache hit for sstable {}", descriptor.generation); + Tracing.trace("Key cache hit for sstable {}", descriptor.generation); return cachedPosition; } } @@ -764,7 +765,7 @@ public class SSTableReader extends SSTable } else { - logger.debug("Index sample allows skipping sstable {}", descriptor.generation); + Tracing.trace("Index sample allows skipping sstable {}", descriptor.generation); return null; } } @@ -805,7 +806,7 @@ public class SSTableReader extends SSTable exactMatch = (comparison == 0); if (v < 0) { - logger.debug("Partition index lookup allows skipping sstable {}", descriptor.generation); + Tracing.trace("Partition index lookup allows skipping sstable {}", descriptor.generation); return null; } } @@ -834,7 +835,7 @@ public class SSTableReader extends SSTable } if (op == Operator.EQ && updateCacheAndStats) bloomFilterTracker.addTruePositive(); - logger.debug("Partition index lookup complete for sstable {}", descriptor.generation); + Tracing.trace("Partition index lookup complete for sstable {}", descriptor.generation); return indexEntry; } @@ -854,7 +855,7 @@ public class SSTableReader extends SSTable if (op == Operator.EQ && updateCacheAndStats) bloomFilterTracker.addFalsePositive(); - logger.debug("Partition index lookup complete (bloom filter false positive) {}", descriptor.generation); + Tracing.trace("Partition index lookup complete (bloom filter false positive) for sstable {}", descriptor.generation); return null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 553301d..d5aae5c 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -701,7 +701,7 @@ public final class MessagingService implements MessagingServiceMBean public void receive(MessageIn message, String id, long timestamp) { Tracing.instance().initializeFromMessage(message); - logger.debug("Messsage received from {}", message.from); + Tracing.trace("Message received from {}", message.from); message = SinkManager.processInboundMessage(message, id); if (message == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index f79e4a0..ebb6ade 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; @@ -172,9 +173,9 @@ public class OutboundTcpConnection extends Thread if (sessionBytes != null) { UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes)); - Tracing.instance().continueExistingSession(sessionId); - logger.debug("Sending message to {}", poolReference.endPoint()); - Tracing.instance().maybeStopNonlocalSession(sessionId); + TraceState state = Tracing.instance().get(sessionId); + state.trace("Sending message to {}", poolReference.endPoint()); + Tracing.instance().stopIfNonLocal(state); } write(qm.message, qm.id, qm.timestamp, out, targetVersion); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/net/ResponseVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index 5b51058..d0931c3 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -20,6 +20,8 @@ package org.apache.cassandra.net; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.tracing.Tracing; + public class ResponseVerbHandler implements IVerbHandler { private static final Logger logger = LoggerFactory.getLogger( ResponseVerbHandler.class ); @@ -30,7 +32,9 @@ public class ResponseVerbHandler implements IVerbHandler CallbackInfo callbackInfo = MessagingService.instance().removeRegisteredCallback(id); if (callbackInfo == null) { - logger.debug("Callback already removed for {}", id); + String msg = "Callback already removed for {} (from {})"; + logger.debug(msg, id, message.from); + Tracing.trace(msg, id, message.from); return; } @@ -39,12 +43,12 @@ public class ResponseVerbHandler implements IVerbHandler if (cb instanceof IAsyncCallback) { - logger.debug("Processing response from {}", message.from); + Tracing.trace("Processing response from {}", message.from); ((IAsyncCallback) cb).response(message); } else { - logger.debug("Processing result from {}", message.from); + Tracing.trace("Processing result from {}", message.from); ((IAsyncResult) cb).result(message); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java index fa4aabb..5bd9876 100644 --- a/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java +++ b/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java @@ -27,6 +27,7 @@ import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.thrift.ThriftValidation; +import org.apache.cassandra.tracing.Tracing; @Deprecated // 1.1 implements index scan with RangeSliceVerb instead public class IndexScanVerbHandler implements IVerbHandler<IndexScanCommand> @@ -44,7 +45,7 @@ public class IndexScanVerbHandler implements IVerbHandler<IndexScanCommand> command.index_clause.count, ThriftValidation.asIFilter(command.predicate, cfs.getComparator())); RangeSliceReply reply = new RangeSliceReply(rows); - logger.debug("Enqueuing response to {}", message.from); + Tracing.trace("Enqueuing response to {}", message.from); MessagingService.instance().sendReply(reply.createMessage(), id, message.from); } catch (Exception ex) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java index 8727005..ef7beaa 100644 --- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java +++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java @@ -56,7 +56,7 @@ public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand> throw new RuntimeException("Cannot service reads while bootstrapping!"); } RangeSliceReply reply = new RangeSliceReply(executeLocally(message.payload)); - logger.debug("Enqueuing response to {}", message.from); + Tracing.trace("Enqueuing response to {}", message.from); MessagingService.instance().sendReply(reply.createMessage(), id, message.from); } catch (Exception ex) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java index f0f814d..f15e8c5 100644 --- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java +++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java @@ -26,6 +26,7 @@ import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.tracing.Tracing; public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand> { @@ -38,7 +39,7 @@ public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand> Table.open(command.keyspace).clearSnapshot(command.snapshot_name); else Table.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name); - logger.debug("Enqueuing response to snapshot request {} to {} ", command.snapshot_name, message.from); + Tracing.trace("Enqueuing response to snapshot request {} to {}", command.snapshot_name, message.from); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.REQUEST_RESPONSE), id, message.from); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 4b700be..bd2c289 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -56,6 +56,7 @@ import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.metrics.ClientRequestMetrics; import org.apache.cassandra.net.*; +import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -170,7 +171,7 @@ public class StorageProxy implements StorageProxyMBean public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, OverloadedException, WriteTimeoutException { - logger.debug("Determining replicas for mutation"); + Tracing.trace("Determining replicas for mutation"); logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level); final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); @@ -212,17 +213,20 @@ public class StorageProxy implements StorageProxyMBean mstrings.add(mutation.toString(true)); logger.debug("Write timeout {} for one (or more) of: {}", ex.toString(), mstrings); } + Tracing.trace("Write timeout"); throw ex; } catch (UnavailableException e) { writeMetrics.unavailables.mark(); ClientRequestMetrics.writeUnavailables.inc(); + Tracing.trace("Unavailable"); throw e; } catch (OverloadedException e) { ClientRequestMetrics.writeUnavailables.inc(); + Tracing.trace("Overloaded"); throw e; } catch (IOException e) @@ -248,7 +252,7 @@ public class StorageProxy implements StorageProxyMBean public static void mutateAtomically(Collection<RowMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, OverloadedException, WriteTimeoutException { - logger.debug("Determining replicas for atomic batch"); + Tracing.trace("Determining replicas for atomic batch"); long startTime = System.nanoTime(); logger.trace("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level); @@ -281,12 +285,14 @@ public class StorageProxy implements StorageProxyMBean { writeMetrics.unavailables.mark(); ClientRequestMetrics.writeUnavailables.inc(); + Tracing.trace("Unavailable"); throw e; } catch (WriteTimeoutException e) { writeMetrics.timeouts.mark(); ClientRequestMetrics.writeTimeouts.inc(); + Tracing.trace("Write timeout"); throw e; } finally @@ -1078,7 +1084,7 @@ public class StorageProxy implements StorageProxyMBean public static List<Row> getRangeSlice(RangeSliceCommand command, ConsistencyLevel consistency_level) throws IOException, UnavailableException, ReadTimeoutException { - logger.debug("Determining replicas to query"); + Tracing.trace("Determining replicas to query"); logger.trace("Command/ConsistencyLevel is {}/{}", command.toString(), consistency_level); long startTime = System.nanoTime(); List<Row> rows; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/thrift/ThriftClientState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftClientState.java b/src/java/org/apache/cassandra/thrift/ThriftClientState.java index 2b1a3de..632f948 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftClientState.java +++ b/src/java/org/apache/cassandra/thrift/ThriftClientState.java @@ -24,7 +24,6 @@ import org.apache.cassandra.cql.CQLStatement; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/tracing/TraceState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java index e305471..e60998f 100644 --- a/src/java/org/apache/cassandra/tracing/TraceState.java +++ b/src/java/org/apache/cassandra/tracing/TraceState.java @@ -19,12 +19,22 @@ package org.apache.cassandra.tracing; import java.net.InetAddress; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.UUID; import java.util.concurrent.TimeUnit; import com.google.common.base.Stopwatch; +import org.slf4j.Logger; +import org.slf4j.helpers.MessageFormatter; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.RowMutation; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.utils.*; /** * ThreadLocal state for a tracing session. The presence of an instance of this class as a ThreadLocal denotes that an @@ -56,4 +66,43 @@ public class TraceState long elapsed = watch.elapsedTime(TimeUnit.MICROSECONDS); return elapsed < Integer.MAX_VALUE ? (int) elapsed : Integer.MAX_VALUE; } + + public void trace(String format, Object arg) + { + trace(MessageFormatter.format(format, arg).getMessage()); + } + + public void trace(String format, Object arg1, Object arg2) + { + trace(MessageFormatter.format(format, arg1, arg2).getMessage()); + } + + public void trace(String format, Object[] args) + { + trace(MessageFormatter.arrayFormat(format, args).getMessage()); + } + + public void trace(final String message) + { + final int elapsed = elapsed(); + final ByteBuffer eventId = ByteBufferUtil.bytes(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress())); + + final String threadName = Thread.currentThread().getName(); + + StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() + { + public void runMayThrow() throws Exception + { + CFMetaData cfMeta = CFMetaData.TraceEventsCf; + ColumnFamily cf = ColumnFamily.create(cfMeta); + Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source")), FBUtilities.getBroadcastAddress()); + Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName); + Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed); + Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message); + RowMutation mutation = new RowMutation(Tracing.TRACE_KS, sessionIdBytes); + mutation.add(cf); + StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY); + } + }); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/tracing/Tracing.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java index 7e639df..17241b9 100644 --- a/src/java/org/apache/cassandra/tracing/Tracing.java +++ b/src/java/org/apache/cassandra/tracing/Tracing.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -151,16 +150,8 @@ public class Tracing return sessionId; } - /** - * Removes the state data but does not log it as complete. - * For use by replica nodes, after replying to the master. - * - * Note: checking that the session exists is the job of the caller. - */ - public void maybeStopNonlocalSession(UUID sessionId) + public void stopIfNonLocal(TraceState state) { - TraceState state = sessions.get(sessionId); - assert state != null; if (!state.isLocallyOwned) sessions.remove(state.sessionId); } @@ -203,6 +194,11 @@ public class Tracing return state.get(); } + public TraceState get(UUID sessionId) + { + return sessions.get(sessionId); + } + public void set(final TraceState tls) { state.set(tls); @@ -260,11 +256,51 @@ public class Tracing state.set(ts); } - /** - * Activate @param sessionId representing a session we've already seen - */ - public void continueExistingSession(UUID sessionId) + public static void trace(String message) { - state.set(sessions.get(sessionId)); + if (Tracing.instance() == null) // instance might not be built at the time this is called + return; + + final TraceState state = Tracing.instance().get(); + if (state == null) // inline isTracing to avoid implicit two calls to state.get() + return; + + state.trace(message); + } + + public static void trace(String format, Object arg) + { + if (Tracing.instance() == null) // instance might not be built at the time this is called + return; + + final TraceState state = Tracing.instance().get(); + if (state == null) // inline isTracing to avoid implicit two calls to state.get() + return; + + state.trace(format, arg); + } + + public static void trace(String format, Object arg1, Object arg2) + { + if (Tracing.instance() == null) // instance might not be built at the time this is called + return; + + final TraceState state = Tracing.instance().get(); + if (state == null) // inline isTracing to avoid implicit two calls to state.get() + return; + + state.trace(format, arg1, arg2); + } + + public static void trace(String format, Object[] args) + { + if (Tracing.instance() == null) // instance might not be built at the time this is called + return; + + final TraceState state = Tracing.instance().get(); + if (state == null) // inline isTracing to avoid implicit two calls to state.get() + return; + + state.trace(format, args); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d7ff10d8/src/java/org/apache/cassandra/tracing/TracingAppender.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TracingAppender.java b/src/java/org/apache/cassandra/tracing/TracingAppender.java deleted file mode 100644 index ff8fc62..0000000 --- a/src/java/org/apache/cassandra/tracing/TracingAppender.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.tracing; - -import static org.apache.cassandra.tracing.Tracing.*; -import static org.apache.cassandra.utils.ByteBufferUtil.bytes; - -import java.nio.ByteBuffer; -import java.util.Arrays; - -import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.db.RowMutation; -import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.UUIDGen; -import org.apache.cassandra.utils.WrappedRunnable; -import org.apache.log4j.AppenderSkeleton; -import org.apache.log4j.spi.LoggingEvent; - -public class TracingAppender extends AppenderSkeleton -{ - protected void append(final LoggingEvent event) - { - if (Tracing.instance() == null) // instance might not be built at the time this is called - return; - - final TraceState state = Tracing.instance().get(); - if (state == null) // inline isTracing to avoid implicit two calls to state.get() - return; - - final int elapsed = state.elapsed(); - final String threadName = event.getThreadName(); - final ByteBuffer eventId = ByteBufferUtil.bytes(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress())); - StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() - { - public void runMayThrow() throws Exception - { - CFMetaData cfMeta = CFMetaData.TraceEventsCf; - ColumnFamily cf = ColumnFamily.create(cfMeta); - addColumn(cf, buildName(cfMeta, eventId, bytes("source")), FBUtilities.getBroadcastAddress()); - addColumn(cf, buildName(cfMeta, eventId, bytes("thread")), threadName); - addColumn(cf, buildName(cfMeta, eventId, bytes("source_elapsed")), elapsed); - addColumn(cf, buildName(cfMeta, eventId, bytes("activity")), event.getMessage().toString()); - RowMutation mutation = new RowMutation(Tracing.TRACE_KS, state.sessionIdBytes); - mutation.add(cf); - StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY); - } - }); - } - - public void close() - { - } - - public boolean requiresLayout() - { - return false; - } -}
