Repository: cassandra Updated Branches: refs/heads/trunk 44e30e33c -> 6966fcda9
Added slow query log patch by Shogo Hoshii and Stefania Alborghetti; reviewed by Tyler Hobbs and Stefania Alborghetti for CASSANDRA-12403 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6966fcda Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6966fcda Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6966fcda Branch: refs/heads/trunk Commit: 6966fcda9b5ce657760dfe103e5812862306ff7a Parents: 44e30e3 Author: Shogo Hoshii <[email protected]> Authored: Wed Aug 10 09:51:57 2016 +0800 Committer: Stefania Alborghetti <[email protected]> Committed: Wed Aug 17 10:47:14 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 3 + conf/cassandra.yaml | 7 +- .../org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 5 + .../cassandra/db/ReadCommandVerbHandler.java | 2 +- .../cassandra/db/monitoring/Monitorable.java | 2 + .../db/monitoring/MonitorableImpl.java | 25 +- .../cassandra/db/monitoring/MonitoringTask.java | 271 +++++++++++++++---- .../org/apache/cassandra/net/MessageIn.java | 5 + .../apache/cassandra/service/StorageProxy.java | 2 +- .../db/monitoring/MonitoringTaskTest.java | 166 ++++++++++-- 12 files changed, 398 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index dfed9b4..51b87db 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Added slow query log (CASSANDRA-12403) * Count full coordinated request against timeout (CASSANDRA-12256) * Allow TTL with null value on insert and update (CASSANDRA-12216) * Make decommission operation resumable (CASSANDRA-12008) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 86ec36c..a8ba483 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -18,6 +18,9 @@ using the provided 'sstableupgrade' tool. New features ------------ + - A slow query log has been added: slow queries will be logged at DEBUG level. + For more details refer to CASSANDRA-12403 and slow_query_log_timeout_in_ms + in cassandra.yaml. - Support for GROUP BY queries has been added. - A new compaction-stress tool has been added to test the throughput of compaction for any cassandra-stress user schema. see compaction-stress help for how to use. http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index e724941..5fb44cf 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -1,4 +1,4 @@ -# Cassandra storage config YAML +# Cassandra storage config YAML # NOTE: # See http://wiki.apache.org/cassandra/StorageConfiguration for @@ -855,6 +855,11 @@ truncate_request_timeout_in_ms: 60000 # The default timeout for other, miscellaneous operations request_timeout_in_ms: 10000 +# How long before a node logs slow queries. Select queries that take longer than +# this timeout to execute, will generate an aggregated log message, so that slow queries +# can be identified. Set this value to zero to disable slow query logging. +slow_query_log_timeout_in_ms: 500 + # Enable operation timeout information exchange between nodes to accurately # measure request timeouts. If disabled, replicas will assume that requests # were forwarded to them instantly by the coordinator, which means that http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 0dd3cc8..fdf27d9 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -103,6 +103,8 @@ public class Config public boolean cross_node_timeout = false; + public volatile long slow_query_log_timeout_in_ms = 500L; + public volatile Double phi_convict_threshold = 8.0; public Integer concurrent_reads = 32; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 103cb9d..6f71817 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1321,6 +1321,11 @@ public class DatabaseDescriptor } } + public static long getSlowQueryTimeout() + { + return conf.slow_query_log_timeout_in_ms; + } + /** * @return the minimum configured {read, write, range, truncate, misc} timeout */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java index e2a9678..7948590 100644 --- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java @@ -41,7 +41,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand> } ReadCommand command = message.payload; - command.setMonitoringTime(message.constructionTime, message.getTimeout()); + command.setMonitoringTime(message.constructionTime, message.getTimeout(), message.getSlowQueryTimeout()); ReadResponse response; try (ReadExecutionController executionController = command.executionController(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/src/java/org/apache/cassandra/db/monitoring/Monitorable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java b/src/java/org/apache/cassandra/db/monitoring/Monitorable.java index 202ac87..f4c5ee8 100644 --- a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java +++ b/src/java/org/apache/cassandra/db/monitoring/Monitorable.java @@ -23,10 +23,12 @@ public interface Monitorable String name(); ConstructionTime constructionTime(); long timeout(); + long slowTimeout(); boolean isInProgress(); boolean isAborted(); boolean isCompleted(); + boolean isSlow(); boolean abort(); boolean complete(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java b/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java index f89f8ad..7363e10 100644 --- a/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java +++ b/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java @@ -21,12 +21,15 @@ package org.apache.cassandra.db.monitoring; public abstract class MonitorableImpl implements Monitorable { private MonitoringState state; + private boolean isSlow; private ConstructionTime constructionTime; private long timeout; + private long slowTimeout; protected MonitorableImpl() { this.state = MonitoringState.IN_PROGRESS; + this.isSlow = false; } /** @@ -34,10 +37,11 @@ public abstract class MonitorableImpl implements Monitorable * is too complex, it would require passing new parameters to all serializers * or specializing the serializers to accept these message properties. */ - public void setMonitoringTime(ConstructionTime constructionTime, long timeout) + public void setMonitoringTime(ConstructionTime constructionTime, long timeout, long slowTimeout) { this.constructionTime = constructionTime; this.timeout = timeout; + this.slowTimeout = slowTimeout; } public ConstructionTime constructionTime() @@ -50,6 +54,11 @@ public abstract class MonitorableImpl implements Monitorable return timeout; } + public long slowTimeout() + { + return slowTimeout; + } + public boolean isInProgress() { check(); @@ -68,12 +77,19 @@ public abstract class MonitorableImpl implements Monitorable return state == MonitoringState.COMPLETED; } + public boolean isSlow() + { + check(); + return isSlow; + } + public boolean abort() { if (state == MonitoringState.IN_PROGRESS) { if (constructionTime != null) MonitoringTask.addFailedOperation(this, ApproximateTime.currentTimeMillis()); + state = MonitoringState.ABORTED; return true; } @@ -85,6 +101,9 @@ public abstract class MonitorableImpl implements Monitorable { if (state == MonitoringState.IN_PROGRESS) { + if (isSlow && slowTimeout > 0 && constructionTime != null) + MonitoringTask.addSlowOperation(this, ApproximateTime.currentTimeMillis()); + state = MonitoringState.COMPLETED; return true; } @@ -98,6 +117,10 @@ public abstract class MonitorableImpl implements Monitorable return; long elapsed = ApproximateTime.currentTimeMillis() - constructionTime.timestamp; + + if (elapsed >= slowTimeout && !isSlow) + isSlow = true; + if (elapsed >= timeout) abort(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java index a44773a..b116485 100644 --- a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java +++ b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java @@ -36,17 +36,20 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.Config; +import org.apache.cassandra.utils.NoSpamLogger; import static java.lang.System.getProperty; /** * A task for monitoring in progress operations, currently only read queries, and aborting them if they time out. * We also log timed out operations, see CASSANDRA-7392. + * Since CASSANDRA-12403 we also log queries that were slow. */ -public class MonitoringTask +class MonitoringTask { private static final String LINE_SEPARATOR = getProperty("line.separator"); private static final Logger logger = LoggerFactory.getLogger(MonitoringTask.class); + private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 5L, TimeUnit.MINUTES); /** * Defines the interval for reporting any operations that have timed out. @@ -62,12 +65,12 @@ public class MonitoringTask @VisibleForTesting static MonitoringTask instance = make(REPORT_INTERVAL_MS, MAX_OPERATIONS); - private final int maxOperations; private final ScheduledFuture<?> reportingTask; - private final BlockingQueue<FailedOperation> operationsQueue; - private final AtomicLong numDroppedOperations; + private final OperationsQueue failedOperationsQueue; + private final OperationsQueue slowOperationsQueue; private long lastLogTime; + @VisibleForTesting static MonitoringTask make(int reportIntervalMillis, int maxTimedoutOperations) { @@ -82,13 +85,13 @@ public class MonitoringTask private MonitoringTask(int reportIntervalMillis, int maxOperations) { - this.maxOperations = maxOperations; - this.operationsQueue = maxOperations > 0 ? new ArrayBlockingQueue<>(maxOperations) : new LinkedBlockingQueue<>(); - this.numDroppedOperations = new AtomicLong(); + this.failedOperationsQueue = new OperationsQueue(maxOperations); + this.slowOperationsQueue = new OperationsQueue(maxOperations); + this.lastLogTime = ApproximateTime.currentTimeMillis(); logger.info("Scheduling monitoring task with report interval of {} ms, max operations {}", reportIntervalMillis, maxOperations); - this.reportingTask = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(() -> logFailedOperations(ApproximateTime.currentTimeMillis()), + this.reportingTask = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(() -> logOperations(ApproximateTime.currentTimeMillis()), reportIntervalMillis, reportIntervalMillis, TimeUnit.MILLISECONDS); @@ -99,57 +102,51 @@ public class MonitoringTask reportingTask.cancel(false); } - public static void addFailedOperation(Monitorable operation, long now) + static void addFailedOperation(Monitorable operation, long now) { - instance.innerAddFailedOperation(operation, now); + instance.failedOperationsQueue.offer(new FailedOperation(operation, now)); } - private void innerAddFailedOperation(Monitorable operation, long now) + static void addSlowOperation(Monitorable operation, long now) { - if (maxOperations == 0) - return; // logging of failed operations disabled - - if (!operationsQueue.offer(new FailedOperation(operation, now))) - numDroppedOperations.incrementAndGet(); + instance.slowOperationsQueue.offer(new SlowOperation(operation, now)); } @VisibleForTesting - FailedOperations aggregateFailedOperations() + List<String> getFailedOperations() { - Map<String, FailedOperation> operations = new HashMap<>(); + return getLogMessages(failedOperationsQueue.popOperations()); + } - FailedOperation failedOperation; - while((failedOperation = operationsQueue.poll()) != null) - { - FailedOperation existing = operations.get(failedOperation.name()); - if (existing != null) - existing.addTimeout(failedOperation); - else - operations.put(failedOperation.name(), failedOperation); - } + @VisibleForTesting + List<String> getSlowOperations() + { + return getLogMessages(slowOperationsQueue.popOperations()); + } - return new FailedOperations(operations, numDroppedOperations.getAndSet(0L)); + private List<String> getLogMessages(AggregatedOperations operations) + { + String ret = operations.getLogMessage(); + return ret.isEmpty() ? Collections.emptyList() : Arrays.asList(ret.split("\n")); } @VisibleForTesting - List<String> getFailedOperations() + private void logOperations(long now) { - FailedOperations failedOperations = aggregateFailedOperations(); - String ret = failedOperations.getLogMessage(); - lastLogTime = ApproximateTime.currentTimeMillis(); - return ret.isEmpty() ? Collections.emptyList() : Arrays.asList(ret.split("\n")); + logSlowOperations(now); + logFailedOperations(now); + + lastLogTime = now; } @VisibleForTesting - void logFailedOperations(long now) + boolean logFailedOperations(long now) { - FailedOperations failedOperations = aggregateFailedOperations(); + AggregatedOperations failedOperations = failedOperationsQueue.popOperations(); if (!failedOperations.isEmpty()) { long elapsed = now - lastLogTime; - logger.warn("{} operations timed out in the last {} msecs, operation list available at debug log level", - failedOperations.num(), - elapsed); + noSpamLogger.warn("Some operations timed out, details available at debug level (debug.log)"); if (logger.isDebugEnabled()) logger.debug("{} operations timed out in the last {} msecs:{}{}", @@ -157,17 +154,109 @@ public class MonitoringTask elapsed, LINE_SEPARATOR, failedOperations.getLogMessage()); + return true; } - lastLogTime = now; + return false; } - private static final class FailedOperations + @VisibleForTesting + boolean logSlowOperations(long now) { - public final Map<String, FailedOperation> operations; - public final long numDropped; + AggregatedOperations slowOperations = slowOperationsQueue.popOperations(); + if (!slowOperations.isEmpty()) + { + long elapsed = now - lastLogTime; + noSpamLogger.info("Some operations were slow, details available at debug level (debug.log)"); - FailedOperations(Map<String, FailedOperation> operations, long numDropped) + if (logger.isDebugEnabled()) + logger.debug("{} operations were slow in the last {} msecs:{}{}", + slowOperations.num(), + elapsed, + LINE_SEPARATOR, + slowOperations.getLogMessage()); + return true; + } + return false; + } + + /** + * A wrapper for a queue that can be either bounded, in which case + * we increment a counter if we exceed the queue size, or unbounded. + */ + private static final class OperationsQueue + { + /** The max operations on the queue. If this value is zero then logging is disabled + * and the queue will always be empty. If this value is negative then the queue is unbounded. + */ + private final int maxOperations; + + /** + * The operations queue, it can be either bounded or unbounded depending on the value of maxOperations. + */ + private final BlockingQueue<Operation> queue; + + /** + * If we fail to add an operation to the queue then we increment this value. We reset this value + * when the queue is emptied. + */ + private final AtomicLong numDroppedOperations; + + OperationsQueue(int maxOperations) + { + this.maxOperations = maxOperations; + this.queue = maxOperations > 0 ? new ArrayBlockingQueue<>(maxOperations) : new LinkedBlockingQueue<>(); + this.numDroppedOperations = new AtomicLong(); + } + + /** + * Add an operation to the queue, if possible, or increment the dropped counter. + * + * @param operation - the operations to add + */ + private void offer(Operation operation) + { + if (maxOperations == 0) + return; // logging of operations is disabled + + if (!queue.offer(operation)) + numDroppedOperations.incrementAndGet(); + } + + + /** + * Return all operations in the queue, aggregated by name, and reset + * the counter for dropped operations. + * + * @return - the aggregated operations + */ + private AggregatedOperations popOperations() + { + Map<String, Operation> operations = new HashMap<>(); + + Operation operation; + while((operation = queue.poll()) != null) + { + Operation existing = operations.get(operation.name()); + if (existing != null) + existing.add(operation); + else + operations.put(operation.name(), operation); + } + return new AggregatedOperations(operations, numDroppedOperations.getAndSet(0L)); + } + } + + /** + * Convert a map of aggregated operations into a log message that + * includes the information of whether some operations were dropped. + */ + private static final class AggregatedOperations + { + private final Map<String, Operation> operations; + private final long numDropped; + + AggregatedOperations(Map<String, Operation> operations, long numDropped) { this.operations = operations; this.numDropped = numDropped; @@ -183,7 +272,7 @@ public class MonitoringTask return operations.size() + numDropped; } - public String getLogMessage() + String getLogMessage() { if (isEmpty()) return ""; @@ -200,7 +289,7 @@ public class MonitoringTask return ret.toString(); } - private static void addOperation(StringBuilder ret, FailedOperation operation) + private static void addOperation(StringBuilder ret, Operation operation) { if (ret.length() > 0) ret.append(LINE_SEPARATOR); @@ -209,19 +298,38 @@ public class MonitoringTask } } - private final static class FailedOperation + /** + * A wrapper class for an operation that either failed (timed-out) or + * was reported as slow. Because the same operation (query) may execute + * multiple times, we aggregate the number of times an operation with the + * same name (CQL query text) is reported and store the average, min and max + * times. + */ + protected abstract static class Operation { - public final Monitorable operation; - public int numTimeouts; - public long totalTime; - public long maxTime; - public long minTime; + /** The operation that was reported as slow or timed out */ + final Monitorable operation; + + /** The number of times the operation was reported */ + int numTimesReported; + + /** The total time spent by this operation */ + long totalTime; + + /** The maximum time spent by this operation */ + long maxTime; + + /** The minimum time spent by this operation */ + long minTime; + + /** The name of the operation, i.e. the SELECT query CQL, + * this is set lazily as it takes time to build the query CQL */ private String name; - FailedOperation(Monitorable operation, long failedAt) + Operation(Monitorable operation, long failedAt) { this.operation = operation; - numTimeouts = 1; + numTimesReported = 1; totalTime = failedAt - operation.constructionTime().timestamp; minTime = totalTime; maxTime = totalTime; @@ -234,31 +342,74 @@ public class MonitoringTask return name; } - void addTimeout(FailedOperation operation) + void add(Operation operation) { - numTimeouts++; + numTimesReported++; totalTime += operation.totalTime; maxTime = Math.max(maxTime, operation.maxTime); minTime = Math.min(minTime, operation.minTime); } + public abstract String getLogMessage(); + } + + /** + * An operation (query) that timed out. + */ + private final static class FailedOperation extends Operation + { + FailedOperation(Monitorable operation, long failedAt) + { + super(operation, failedAt); + } + public String getLogMessage() { - if (numTimeouts == 1) - return String.format("%s: total time %d msec - timeout %d %s", + if (numTimesReported == 1) + return String.format("<%s>, total time %d msec, timeout %d %s", name(), totalTime, operation.timeout(), operation.constructionTime().isCrossNode ? "msec/cross-node" : "msec"); else - return String.format("%s (timed out %d times): total time avg/min/max %d/%d/%d msec - timeout %d %s", + return String.format("<%s> timed out %d times, avg/min/max %d/%d/%d msec, timeout %d %s", name(), - numTimeouts, - totalTime / numTimeouts, + numTimesReported, + totalTime / numTimesReported, minTime, maxTime, operation.timeout(), operation.constructionTime().isCrossNode ? "msec/cross-node" : "msec"); } } + + /** + * An operation (query) that was reported as slow. + */ + private final static class SlowOperation extends Operation + { + SlowOperation(Monitorable operation, long failedAt) + { + super(operation, failedAt); + } + + public String getLogMessage() + { + if (numTimesReported == 1) + return String.format("<%s>, time %d msec - slow timeout %d %s", + name(), + totalTime, + operation.slowTimeout(), + operation.constructionTime().isCrossNode ? "msec/cross-node" : "msec"); + else + return String.format("<%s>, was slow %d times: avg/min/max %d/%d/%d msec - slow timeout %d %s", + name(), + numTimesReported, + totalTime / numTimesReported, + minTime, + maxTime, + operation.slowTimeout(), + operation.constructionTime().isCrossNode ? "msec/cross-node" : "msec"); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/src/java/org/apache/cassandra/net/MessageIn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java index df1b4e1..23b2995 100644 --- a/src/java/org/apache/cassandra/net/MessageIn.java +++ b/src/java/org/apache/cassandra/net/MessageIn.java @@ -157,6 +157,11 @@ public class MessageIn<T> return DatabaseDescriptor.getTimeout(verb); } + public long getSlowQueryTimeout() + { + return DatabaseDescriptor.getSlowQueryTimeout(); + } + public String toString() { StringBuilder sbuf = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 9bf90dc..9cfbd68 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1841,7 +1841,7 @@ public class StorageProxy implements StorageProxyMBean { try { - command.setMonitoringTime(new ConstructionTime(constructionTime), timeout); + command.setMonitoringTime(new ConstructionTime(constructionTime), timeout, DatabaseDescriptor.getSlowQueryTimeout()); ReadResponse response; try (ReadExecutionController executionController = command.executionController(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6966fcda/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java b/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java index 4490519..14659e3 100644 --- a/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java +++ b/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.BeforeClass; import org.junit.Test; @@ -39,10 +40,12 @@ import static org.junit.Assert.fail; public class MonitoringTaskTest { private static final long timeout = 100; + private static final long slowTimeout = 10; + private static final long MAX_SPIN_TIME_NANOS = TimeUnit.SECONDS.toNanos(5); - public static final int REPORT_INTERVAL_MS = 600000; // long enough so that it won't check unless told to do so - public static final int MAX_TIMEDOUT_OPERATIONS = -1; // unlimited + private static final int REPORT_INTERVAL_MS = 600000; // long enough so that it won't check unless told to do so + private static final int MAX_TIMEDOUT_OPERATIONS = -1; // unlimited @BeforeClass public static void setup() @@ -50,14 +53,22 @@ public class MonitoringTaskTest MonitoringTask.instance = MonitoringTask.make(REPORT_INTERVAL_MS, MAX_TIMEDOUT_OPERATIONS); } + @After + public void cleanUp() + { + // these clear the queues of the monitorint task + MonitoringTask.instance.getSlowOperations(); + MonitoringTask.instance.getFailedOperations(); + } + private static final class TestMonitor extends MonitorableImpl { private final String name; - TestMonitor(String name, ConstructionTime constructionTime, long timeout) + TestMonitor(String name, ConstructionTime constructionTime, long timeout, long slow) { this.name = name; - setMonitoringTime(constructionTime, timeout); + setMonitoringTime(constructionTime, timeout, slow); } public String name() @@ -88,15 +99,32 @@ public class MonitoringTaskTest long numInProgress = operations.stream().filter(Monitorable::isInProgress).count(); if (numInProgress == 0) return; + } + } + + private static void waitForOperationsToBeReportedAsSlow(Monitorable... operations) throws InterruptedException + { + waitForOperationsToBeReportedAsSlow(Arrays.asList(operations)); + } + + private static void waitForOperationsToBeReportedAsSlow(List<Monitorable> operations) throws InterruptedException + { + long timeout = operations.stream().map(Monitorable::slowTimeout).reduce(0L, Long::max); + Thread.sleep(timeout * 2 + ApproximateTime.precision()); - Thread.yield(); + long start = System.nanoTime(); + while(System.nanoTime() - start <= MAX_SPIN_TIME_NANOS) + { + long numSlow = operations.stream().filter(Monitorable::isSlow).count(); + if (numSlow == operations.size()) + return; } } @Test public void testAbort() throws InterruptedException { - Monitorable operation = new TestMonitor("Test abort", new ConstructionTime(System.currentTimeMillis()), timeout); + Monitorable operation = new TestMonitor("Test abort", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout); waitForOperationsToComplete(operation); assertTrue(operation.isAborted()); @@ -107,7 +135,7 @@ public class MonitoringTaskTest @Test public void testAbortIdemPotent() throws InterruptedException { - Monitorable operation = new TestMonitor("Test abort", new ConstructionTime(System.currentTimeMillis()), timeout); + Monitorable operation = new TestMonitor("Test abort", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout); waitForOperationsToComplete(operation); assertTrue(operation.abort()); @@ -120,7 +148,7 @@ public class MonitoringTaskTest @Test public void testAbortCrossNode() throws InterruptedException { - Monitorable operation = new TestMonitor("Test for cross node", new ConstructionTime(System.currentTimeMillis(), true), timeout); + Monitorable operation = new TestMonitor("Test for cross node", new ConstructionTime(System.currentTimeMillis(), true), timeout, slowTimeout); waitForOperationsToComplete(operation); assertTrue(operation.isAborted()); @@ -131,7 +159,7 @@ public class MonitoringTaskTest @Test public void testComplete() throws InterruptedException { - Monitorable operation = new TestMonitor("Test complete", new ConstructionTime(System.currentTimeMillis()), timeout); + Monitorable operation = new TestMonitor("Test complete", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout); operation.complete(); waitForOperationsToComplete(operation); @@ -143,7 +171,7 @@ public class MonitoringTaskTest @Test public void testCompleteIdemPotent() throws InterruptedException { - Monitorable operation = new TestMonitor("Test complete", new ConstructionTime(System.currentTimeMillis()), timeout); + Monitorable operation = new TestMonitor("Test complete", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout); operation.complete(); waitForOperationsToComplete(operation); @@ -155,14 +183,47 @@ public class MonitoringTaskTest } @Test + public void testReportSlow() throws InterruptedException + { + Monitorable operation = new TestMonitor("Test report slow", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout); + waitForOperationsToBeReportedAsSlow(operation); + + assertTrue(operation.isSlow()); + operation.complete(); + assertFalse(operation.isAborted()); + assertTrue(operation.isCompleted()); + assertEquals(1, MonitoringTask.instance.getSlowOperations().size()); + } + + @Test + public void testNoReportSlowIfZeroSlowTimeout() throws InterruptedException + { + // when the slow timeout is set to zero then operation won't be reported as slow + Monitorable operation = new TestMonitor("Test report slow disabled", new ConstructionTime(System.currentTimeMillis()), timeout, 0); + waitForOperationsToBeReportedAsSlow(operation); + + assertTrue(operation.isSlow()); + operation.complete(); + assertFalse(operation.isAborted()); + assertTrue(operation.isCompleted()); + assertEquals(0, MonitoringTask.instance.getSlowOperations().size()); + } + + @Test public void testReport() throws InterruptedException { - Monitorable operation = new TestMonitor("Test report", new ConstructionTime(System.currentTimeMillis()), timeout); + Monitorable operation = new TestMonitor("Test report", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout); waitForOperationsToComplete(operation); + assertTrue(operation.isSlow()); assertTrue(operation.isAborted()); assertFalse(operation.isCompleted()); - MonitoringTask.instance.logFailedOperations(ApproximateTime.currentTimeMillis()); + + // aborted operations are not logged as slow + assertFalse(MonitoringTask.instance.logSlowOperations(ApproximateTime.currentTimeMillis())); + assertEquals(0, MonitoringTask.instance.getSlowOperations().size()); + + assertTrue(MonitoringTask.instance.logFailedOperations(ApproximateTime.currentTimeMillis())); assertEquals(0, MonitoringTask.instance.getFailedOperations().size()); } @@ -172,14 +233,22 @@ public class MonitoringTaskTest MonitoringTask.instance = MonitoringTask.make(10, -1); try { - Monitorable operation = new TestMonitor("Test report", new ConstructionTime(System.currentTimeMillis()), timeout); - waitForOperationsToComplete(operation); + Monitorable operation1 = new TestMonitor("Test report 1", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout); + waitForOperationsToComplete(operation1); + + assertTrue(operation1.isAborted()); + assertFalse(operation1.isCompleted()); + + Monitorable operation2 = new TestMonitor("Test report 2", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout); + waitForOperationsToBeReportedAsSlow(operation2); - assertTrue(operation.isAborted()); - assertFalse(operation.isCompleted()); + operation2.complete(); + assertFalse(operation2.isAborted()); + assertTrue(operation2.isCompleted()); Thread.sleep(ApproximateTime.precision() + 500); assertEquals(0, MonitoringTask.instance.getFailedOperations().size()); + assertEquals(0, MonitoringTask.instance.getSlowOperations().size()); } finally { @@ -197,7 +266,7 @@ public class MonitoringTaskTest for (int i = 0; i < opCount; i++) { executorService.submit(() -> - operations.add(new TestMonitor(UUID.randomUUID().toString(), new ConstructionTime(), timeout)) + operations.add(new TestMonitor(UUID.randomUUID().toString(), new ConstructionTime(), timeout, slowTimeout)) ); } @@ -207,6 +276,7 @@ public class MonitoringTaskTest waitForOperationsToComplete(operations); assertEquals(opCount, MonitoringTask.instance.getFailedOperations().size()); + assertEquals(0, MonitoringTask.instance.getSlowOperations().size()); } @Test @@ -228,11 +298,10 @@ public class MonitoringTaskTest MonitoringTask.instance = MonitoringTask.make(REPORT_INTERVAL_MS, maxTimedoutOperations); try { - final int threadCount = numThreads; - ExecutorService executorService = Executors.newFixedThreadPool(threadCount); - final CountDownLatch finished = new CountDownLatch(threadCount); + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + final CountDownLatch finished = new CountDownLatch(numThreads); - for (int i = 0; i < threadCount; i++) + for (int i = 0; i < numThreads; i++) { final String operationName = "Operation " + Integer.toString(i+1); final int numTimes = i + 1; @@ -241,10 +310,16 @@ public class MonitoringTaskTest { for (int j = 0; j < numTimes; j++) { - Monitorable operation = new TestMonitor(operationName, + Monitorable operation1 = new TestMonitor(operationName, new ConstructionTime(System.currentTimeMillis()), - timeout); - waitForOperationsToComplete(operation); + timeout, slowTimeout); + waitForOperationsToComplete(operation1); + + Monitorable operation2 = new TestMonitor(operationName, + new ConstructionTime(System.currentTimeMillis()), + timeout, slowTimeout); + waitForOperationsToBeReportedAsSlow(operation2); + operation2.complete(); } } catch (InterruptedException e) @@ -274,7 +349,7 @@ public class MonitoringTaskTest } @Test - public void testMultipleThreadsSameName() throws InterruptedException + public void testMultipleThreadsSameNameFailed() throws InterruptedException { final int threadCount = 50; final List<Monitorable> operations = new ArrayList<>(threadCount); @@ -286,9 +361,9 @@ public class MonitoringTaskTest executorService.submit(() -> { try { - Monitorable operation = new TestMonitor("Test testMultipleThreadsSameName", + Monitorable operation = new TestMonitor("Test testMultipleThreadsSameName failed", new ConstructionTime(System.currentTimeMillis()), - timeout); + timeout, slowTimeout); operations.add(operation); } finally @@ -302,11 +377,44 @@ public class MonitoringTaskTest assertEquals(0, executorService.shutdownNow().size()); waitForOperationsToComplete(operations); - //MonitoringTask.instance.checkFailedOperations(ApproximateTime.currentTimeMillis()); assertEquals(1, MonitoringTask.instance.getFailedOperations().size()); } @Test + public void testMultipleThreadsSameNameSlow() throws InterruptedException + { + final int threadCount = 50; + final List<Monitorable> operations = new ArrayList<>(threadCount); + ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + final CountDownLatch finished = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) + { + executorService.submit(() -> { + try + { + Monitorable operation = new TestMonitor("Test testMultipleThreadsSameName slow", + new ConstructionTime(System.currentTimeMillis()), + timeout, slowTimeout); + operations.add(operation); + } + finally + { + finished.countDown(); + } + }); + } + + finished.await(); + assertEquals(0, executorService.shutdownNow().size()); + + waitForOperationsToBeReportedAsSlow(operations); + operations.forEach(o -> o.complete()); + + assertEquals(1, MonitoringTask.instance.getSlowOperations().size()); + } + + @Test public void testMultipleThreadsNoFailedOps() throws InterruptedException { final int threadCount = 50; @@ -321,7 +429,7 @@ public class MonitoringTaskTest { Monitorable operation = new TestMonitor("Test thread " + Thread.currentThread().getName(), new ConstructionTime(System.currentTimeMillis()), - timeout); + timeout, slowTimeout); operations.add(operation); operation.complete(); }
