Merge branch 'cassandra-3.9' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bdaa53de Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bdaa53de Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bdaa53de Branch: refs/heads/trunk Commit: bdaa53de4ed938e451cba5cf2df7fa66ef00c153 Parents: cf68431 8e775ea Author: Stefania Alborghetti <[email protected]> Authored: Fri Jul 29 15:30:57 2016 +0800 Committer: Stefania Alborghetti <[email protected]> Committed: Fri Jul 29 15:31:21 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + bin/cqlsh.py | 2 +- .../cassandra/concurrent/StageManager.java | 41 ++++---------- .../cassandra/tracing/ExpiredTraceState.java | 10 ++++ .../apache/cassandra/tracing/TraceState.java | 8 +++ .../cassandra/tracing/TraceStateImpl.java | 59 +++++++++++++++++++- .../apache/cassandra/tracing/TracingImpl.java | 49 ++++++++++++---- .../apache/cassandra/tracing/TracingTest.java | 3 + 8 files changed, 128 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdaa53de/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdaa53de/bin/cqlsh.py ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdaa53de/src/java/org/apache/cassandra/concurrent/StageManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/concurrent/StageManager.java index badc527,64abf00..84b8da6 --- a/src/java/org/apache/cassandra/concurrent/StageManager.java +++ b/src/java/org/apache/cassandra/concurrent/StageManager.java @@@ -59,7 -59,7 +59,7 @@@ public class StageManage stages.put(Stage.TRACING, tracingExecutor()); } -- private static ExecuteOnlyExecutor tracingExecutor() ++ private static LocalAwareExecutorService tracingExecutor() { RejectedExecutionHandler reh = new RejectedExecutionHandler() { @@@ -68,13 -68,13 +68,13 @@@ MessagingService.instance().incrementDroppedMessages(MessagingService.Verb._TRACE); } }; -- return new ExecuteOnlyExecutor(1, -- 1, -- KEEPALIVE, -- TimeUnit.SECONDS, -- new ArrayBlockingQueue<Runnable>(1000), -- new NamedThreadFactory(Stage.TRACING.getJmxName()), -- reh); ++ return new TracingExecutor(1, ++ 1, ++ KEEPALIVE, ++ TimeUnit.SECONDS, ++ new ArrayBlockingQueue<Runnable>(1000), ++ new NamedThreadFactory(Stage.TRACING.getJmxName()), ++ reh); } private static JMXEnabledThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads) @@@ -112,13 -112,17 +112,12 @@@ } } - public final static Runnable NO_OP_TASK = () -> {}; - /** -- * A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the - * tracing stage. See CASSANDRA-1123 for background. - * tracing stage. See CASSANDRA-1123 for background. We allow submitting NO_OP tasks, to allow - * a final wait on pending trace events since typically the tracing executor is single-threaded, see - * CASSANDRA-11465. ++ * The executor used for tracing. */ -- private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements LocalAwareExecutorService ++ private static class TracingExecutor extends ThreadPoolExecutor implements LocalAwareExecutorService { -- public ExecuteOnlyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) ++ public TracingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @@@ -133,23 -137,28 +132,5 @@@ { execute(command); } -- -- @Override -- public Future<?> submit(Runnable task) -- { - if (task.equals(NO_OP_TASK)) - { - assert getMaximumPoolSize() == 1 : "Cannot wait for pending tasks if running more than 1 thread"; - return super.submit(task); - } -- throw new UnsupportedOperationException(); -- } -- -- @Override -- public <T> Future<T> submit(Runnable task, T result) -- { -- throw new UnsupportedOperationException(); -- } -- -- @Override -- public <T> Future<T> submit(Callable<T> task) -- { -- throw new UnsupportedOperationException(); -- } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdaa53de/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tracing/ExpiredTraceState.java index fbe2c33,bc8d5dd..9230d38 --- a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java +++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java @@@ -42,4 -42,9 +42,14 @@@ class ExpiredTraceState extends TraceSt { delegate.traceImpl(message); } + + protected void waitForPendingEvents() + { + delegate.waitForPendingEvents(); + } ++ ++ TraceState getDelegate() ++ { ++ return delegate; ++ } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdaa53de/src/java/org/apache/cassandra/tracing/TraceState.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tracing/TraceState.java index 5365d09,ec2bc9e..b4eff6b --- a/src/java/org/apache/cassandra/tracing/TraceState.java +++ b/src/java/org/apache/cassandra/tracing/TraceState.java @@@ -179,6 -181,8 +181,11 @@@ public abstract class TraceState implem protected abstract void traceImpl(String message); - protected abstract void waitForPendingEvents(); ++ protected void waitForPendingEvents() ++ { ++ // if tracing events are asynchronous, then you can use this method to wait for them to complete ++ } + public boolean acquireReference() { while (true) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdaa53de/src/java/org/apache/cassandra/tracing/TraceStateImpl.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tracing/TraceStateImpl.java index 113ebb7,e2d3a68..55e8389 --- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java +++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java @@@ -19,7 -19,12 +19,16 @@@ package org.apache.cassandra.tracing import java.net.InetAddress; import java.util.Collections; ++import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutionException; ++import java.util.concurrent.CompletableFuture; ++import java.util.concurrent.ConcurrentHashMap; ++import java.util.concurrent.Future; + import java.util.concurrent.TimeUnit; ++import java.util.concurrent.TimeoutException; + + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; @@@ -35,6 -41,10 +45,12 @@@ import org.apache.cassandra.utils.Wrapp */ public class TraceStateImpl extends TraceState { + private static final Logger logger = LoggerFactory.getLogger(TraceStateImpl.class); + private static final int WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS = + Integer.valueOf(System.getProperty("cassandra.wait_for_tracing_events_timeout_secs", "1")); + ++ private final Set<Future<?>> pendingFutures = ConcurrentHashMap.newKeySet(); ++ public TraceStateImpl(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType) { super(coordinator, sessionId, traceType); @@@ -46,17 -56,45 +62,54 @@@ final int elapsed = elapsed(); executeMutation(TraceKeyspace.makeEventMutation(sessionIdBytes, message, elapsed, threadName, ttl)); + if (logger.isTraceEnabled()) + logger.trace("Adding <{}> to trace events", message); } - static void executeMutation(final Mutation mutation) + /** - * Post a no-op event to the TRACING stage, so that we can be sure that any previous mutations - * have at least been applied to one replica. This works because the tracking executor only - * has one thread in its pool, see {@link StageManager#tracingExecutor()}. ++ * Wait on submitted futures + */ + protected void waitForPendingEvents() { - StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() + if (WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS <= 0) + return; + + try + { + if (logger.isTraceEnabled()) - logger.trace("Waiting for up to {} seconds for trace events to complete", - WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS); ++ logger.trace("Waiting for up to {} seconds for {} trace events to complete", ++ +WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, pendingFutures.size()); + - StageManager.getStage(Stage.TRACING).submit(StageManager.NO_OP_TASK) - .get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, TimeUnit.SECONDS); ++ CompletableFuture.allOf(pendingFutures.toArray(new CompletableFuture<?>[pendingFutures.size()])) ++ .get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, TimeUnit.SECONDS); ++ } ++ catch (TimeoutException ex) ++ { ++ if (logger.isTraceEnabled()) ++ logger.trace("Failed to wait for tracing events to complete in {} seconds", ++ WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); - logger.debug("Failed to wait for tracing events to complete: {}", t); ++ logger.error("Got exception whilst waiting for tracing events to complete", t); + } + } + - static void executeMutation(final Mutation mutation) ++ ++ void executeMutation(final Mutation mutation) + { - StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable() ++ CompletableFuture<Void> fut = CompletableFuture.runAsync(new WrappedRunnable() { protected void runMayThrow() { mutateWithCatch(mutation); } -- }); ++ }, StageManager.getStage(Stage.TRACING)); ++ ++ boolean ret = pendingFutures.add(fut); ++ if (!ret) ++ logger.warn("Failed to insert pending future, tracing synchronization may not work"); } static void mutateWithCatch(Mutation mutation) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bdaa53de/src/java/org/apache/cassandra/tracing/TracingImpl.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tracing/TracingImpl.java index 52ac183,52ac183..4f69584 --- a/src/java/org/apache/cassandra/tracing/TracingImpl.java +++ b/src/java/org/apache/cassandra/tracing/TracingImpl.java @@@ -28,10 -28,10 +28,6 @@@ import org.apache.cassandra.concurrent. import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.utils.WrappedRunnable; --import org.slf4j.Logger; --import org.slf4j.LoggerFactory; -- -- /** * A trace session context. Able to track and store trace sessions. A session is usually a user initiated query, and may @@@ -39,31 -39,31 +35,62 @@@ */ class TracingImpl extends Tracing { -- private static final Logger logger = LoggerFactory.getLogger(TracingImpl.class); -- public void stopSessionImpl() { -- TraceState state = get(); ++ final TraceStateImpl state = getStateImpl(); ++ if (state == null) ++ return; ++ int elapsed = state.elapsed(); ByteBuffer sessionId = state.sessionIdBytes; int ttl = state.ttl; -- TraceStateImpl.executeMutation(TraceKeyspace.makeStopSessionMutation(sessionId, elapsed, ttl)); ++ ++ state.executeMutation(TraceKeyspace.makeStopSessionMutation(sessionId, elapsed, ttl)); } public TraceState begin(final String request, final InetAddress client, final Map<String, String> parameters) { assert isTracing(); -- final TraceState state = get(); ++ final TraceStateImpl state = getStateImpl(); ++ assert state != null; ++ final long startedAt = System.currentTimeMillis(); final ByteBuffer sessionId = state.sessionIdBytes; final String command = state.traceType.toString(); final int ttl = state.ttl; -- TraceStateImpl.executeMutation(TraceKeyspace.makeStartSessionMutation(sessionId, client, parameters, request, startedAt, command, ttl)); -- ++ state.executeMutation(TraceKeyspace.makeStartSessionMutation(sessionId, client, parameters, request, startedAt, command, ttl)); return state; } ++ /** ++ * Convert the abstract tracing state to its implementation. ++ * ++ * Expired states are not put in the sessions but the check is for extra safety. ++ * ++ * @return the state converted to its implementation, or null ++ */ ++ private TraceStateImpl getStateImpl() ++ { ++ TraceState state = get(); ++ if (state == null) ++ return null; ++ ++ if (state instanceof ExpiredTraceState) ++ { ++ ExpiredTraceState expiredTraceState = (ExpiredTraceState) state; ++ state = expiredTraceState.getDelegate(); ++ } ++ ++ if (state instanceof TraceStateImpl) ++ { ++ return (TraceStateImpl)state; ++ } ++ ++ assert false : "TracingImpl states should be of type TraceStateImpl"; ++ return null; ++ } ++ @Override protected TraceState newTraceState(InetAddress coordinator, UUID sessionId, TraceType traceType) {
