Tracing should log write failure rather than raw exceptions patch by jbellis; reviewed by ayeschenko for CASSANDRA-6133
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/64890d86 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/64890d86 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/64890d86 Branch: refs/heads/trunk Commit: 64890d86d87ec527b2f7fac2bc4d80712c290268 Parents: 672131d Author: Jonathan Ellis <[email protected]> Authored: Wed Oct 2 10:35:26 2013 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Wed Oct 2 10:35:26 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/KSMetaData.java | 2 +- .../org/apache/cassandra/db/RowMutation.java | 5 +++ .../apache/cassandra/tracing/TraceState.java | 16 ++++---- .../org/apache/cassandra/tracing/Tracing.java | 43 ++++++++++++++------ 5 files changed, 45 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/64890d86/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6a695e1..5267709 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2.11 + * Tracing should log write failure rather than raw exceptions (CASSANDRA-6133) * lock access to TM.endpointToHostIdMap (CASSANDRA-6103) * Allow estimated memtable size to exceed slab allocator size (CASSANDRA-6078) * Start MeteredFlusher earlier to prevent OOM during CL replay (CASSANDRA-6087) http://git-wip-us.apache.org/repos/asf/cassandra/blob/64890d86/src/java/org/apache/cassandra/config/KSMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java index b92f9a5..79dff35 100644 --- a/src/java/org/apache/cassandra/config/KSMetaData.java +++ b/src/java/org/apache/cassandra/config/KSMetaData.java @@ -99,7 +99,7 @@ public final class KSMetaData public static KSMetaData traceKeyspace() { List<CFMetaData> cfDefs = Arrays.asList(CFMetaData.TraceSessionsCf, CFMetaData.TraceEventsCf); - return new KSMetaData(Tracing.TRACE_KS, SimpleStrategy.class, ImmutableMap.of("replication_factor", "1"), true, cfDefs); + return new KSMetaData(Tracing.TRACE_KS, SimpleStrategy.class, ImmutableMap.of("replication_factor", "2"), true, cfDefs); } public static KSMetaData testMetadata(String name, Class<? extends AbstractReplicationStrategy> strategyClass, Map<String, String> strategyOptions, CFMetaData... cfDefs) http://git-wip-us.apache.org/repos/asf/cassandra/blob/64890d86/src/java/org/apache/cassandra/db/RowMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java index 69f4a5f..5000d79 100644 --- a/src/java/org/apache/cassandra/db/RowMutation.java +++ b/src/java/org/apache/cassandra/db/RowMutation.java @@ -56,6 +56,11 @@ public class RowMutation implements IMutation this(table, key, new HashMap<UUID, ColumnFamily>()); } + public RowMutation(String keyspaceName, ByteBuffer key, ColumnFamily cf) + { + this(keyspaceName, key, Collections.singletonMap(cf.id(), cf)); + } + public RowMutation(String table, Row row) { this(table, row.key.key); http://git-wip-us.apache.org/repos/asf/cassandra/blob/64890d86/src/java/org/apache/cassandra/tracing/TraceState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java index 4d52f8f..326b351 100644 --- a/src/java/org/apache/cassandra/tracing/TraceState.java +++ b/src/java/org/apache/cassandra/tracing/TraceState.java @@ -19,7 +19,6 @@ package org.apache.cassandra.tracing; import java.net.InetAddress; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -30,10 +29,11 @@ import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.RowMutation; -import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.WrappedRunnable; /** * ThreadLocal state for a tracing session. The presence of an instance of this class as a ThreadLocal denotes that an @@ -91,7 +91,7 @@ public class TraceState StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() { - public void runMayThrow() throws Exception + public void runMayThrow() { CFMetaData cfMeta = CFMetaData.TraceEventsCf; ColumnFamily cf = ColumnFamily.create(cfMeta); @@ -99,10 +99,8 @@ public class TraceState Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName); if (elapsed >= 0) Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed); - Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message); - RowMutation mutation = new RowMutation(Tracing.TRACE_KS, sessionIdBytes); - mutation.add(cf); - StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY); + Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName); + Tracing.mutateWithCatch(new RowMutation(Tracing.TRACE_KS, sessionIdBytes, cf)); } }); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/64890d86/src/java/org/apache/cassandra/tracing/Tracing.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java index c692436..a232293 100644 --- a/src/java/org/apache/cassandra/tracing/Tracing.java +++ b/src/java/org/apache/cassandra/tracing/Tracing.java @@ -38,13 +38,15 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.ExpiringColumn; import org.apache.cassandra.db.RowMutation; import org.apache.cassandra.db.marshal.TimeUUIDType; +import org.apache.cassandra.exceptions.OverloadedException; +import org.apache.cassandra.exceptions.UnavailableException; +import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; -import org.apache.cassandra.utils.WrappedRunnable; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; @@ -171,16 +173,14 @@ public class Tracing final int elapsed = state.elapsed(); final ByteBuffer sessionIdBytes = state.sessionIdBytes; - StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() + StageManager.getStage(Stage.TRACING).execute(new Runnable() { - public void runMayThrow() throws Exception + public void run() { CFMetaData cfMeta = CFMetaData.TraceSessionsCf; ColumnFamily cf = ColumnFamily.create(cfMeta); addColumn(cf, buildName(cfMeta, bytes("duration")), elapsed); - RowMutation mutation = new RowMutation(TRACE_KS, sessionIdBytes); - mutation.add(cf); - StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY); + mutateWithCatch(new RowMutation(TRACE_KS, sessionIdBytes, cf)); } }); @@ -211,19 +211,16 @@ public class Tracing final long started_at = System.currentTimeMillis(); final ByteBuffer sessionIdBytes = state.get().sessionIdBytes; - StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() + StageManager.getStage(Stage.TRACING).execute(new Runnable() { - public void runMayThrow() throws Exception + public void run() { CFMetaData cfMeta = CFMetaData.TraceSessionsCf; ColumnFamily cf = ColumnFamily.create(cfMeta); addColumn(cf, buildName(cfMeta, bytes("coordinator")), FBUtilities.getBroadcastAddress()); addColumn(cf, buildName(cfMeta, bytes("request")), request); addColumn(cf, buildName(cfMeta, bytes("started_at")), started_at); - addParameterColumns(cf, parameters); - RowMutation mutation = new RowMutation(TRACE_KS, sessionIdBytes); - mutation.add(cf); - StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY); + mutateWithCatch(new RowMutation(TRACE_KS, sessionIdBytes, cf)); } }); } @@ -306,4 +303,26 @@ public class Tracing state.trace(format, args); } + + static void mutateWithCatch(RowMutation mutation) + { + try + { + StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY); + } + catch (UnavailableException e) + { + // should never happen; ANY does not throw UAE + throw new AssertionError(e); + } + catch (WriteTimeoutException e) + { + // should never happen; ANY does not throw WTE + throw new AssertionError(e); + } + catch (OverloadedException e) + { + logger.warn("Too many nodes are overloaded to save trace events"); + } + } }
