Repository: cassandra Updated Branches: refs/heads/trunk 260846685 -> 5b09543f6
Allow only one concurrent call to StatusLogger patch by mszczygiel; reviewed by jasobrown for CASSANDRA-12182 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b09543f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b09543f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b09543f Branch: refs/heads/trunk Commit: 5b09543f64eafb1344f7814a80b73d312d5bbc37 Parents: 2608466 Author: mszczygiel <mychal.szczyg...@gmail.com> Authored: Tue Oct 31 22:46:56 2017 +0100 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Fri Nov 3 17:28:05 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/utils/StatusLogger.java | 25 ++- .../cassandra/utils/StatusLoggerTest.java | 160 +++++++++++++++++++ 3 files changed, 185 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b09543f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 71f4b1d..e214177 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Allow only one concurrent call to StatusLogger (CASSANDRA-12182) * Refactoring to specialised functional interfaces (CASSANDRA-13982) * Speculative retry should allow more friendly params (CASSANDRA-13876) * Throw exception if we send/receive repair messages to incompatible nodes (CASSANDRA-13944) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b09543f/src/java/org/apache/cassandra/utils/StatusLogger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java index c33190b..9f9d869 100644 --- a/src/java/org/apache/cassandra/utils/StatusLogger.java +++ b/src/java/org/apache/cassandra/utils/StatusLogger.java @@ -19,11 +19,13 @@ package org.apache.cassandra.utils; import java.lang.management.ManagementFactory; import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; import javax.management.*; import org.apache.cassandra.cache.*; import org.apache.cassandra.metrics.ThreadPoolMetrics; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,10 +39,31 @@ import org.apache.cassandra.service.CacheService; public class StatusLogger { private static final Logger logger = LoggerFactory.getLogger(StatusLogger.class); - + private static final ReentrantLock busyMonitor = new ReentrantLock(); public static void log() { + // avoid logging more than once at the same time. throw away any attempts to log concurrently, as it would be + // confusing and noisy for operators - and don't bother logging again, immediately as it'll just be the same data + if (busyMonitor.tryLock()) + { + try + { + logStatus(); + } + finally + { + busyMonitor.unlock(); + } + } + else + { + logger.trace("StatusLogger is busy"); + } + } + + private static void logStatus() + { MBeanServer server = ManagementFactory.getPlatformMBeanServer(); // everything from o.a.c.concurrent http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b09543f/test/unit/org/apache/cassandra/utils/StatusLoggerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/StatusLoggerTest.java b/test/unit/org/apache/cassandra/utils/StatusLoggerTest.java new file mode 100644 index 0000000..878e6e8 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/StatusLoggerTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.google.common.collect.Range; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.AppenderBase; +import org.apache.cassandra.cql3.CQLTester; + +import static com.google.common.collect.Lists.newArrayList; +import static java.util.stream.Collectors.groupingBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class StatusLoggerTest extends CQLTester +{ + private static final Logger log = LoggerFactory.getLogger(StatusLoggerTest.class); + + @Test + public void testStatusLoggerPrintsStatusOnlyOnceWhenInvokedConcurrently() throws Exception + { + ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(StatusLogger.class); + InMemoryAppender inMemoryAppender = new InMemoryAppender(); + logger.addAppender(inMemoryAppender); + logger.setLevel(Level.TRACE); + try + { + submitTwoLogRequestsConcurrently(); + verifyOnlySingleStatusWasAppendedConcurrently(inMemoryAppender.events); + } + finally + { + assertTrue("Could not remove in memory appender", logger.detachAppender(inMemoryAppender)); + } + } + + private void submitTwoLogRequestsConcurrently() throws InterruptedException + { + ExecutorService executorService = Executors.newFixedThreadPool(2); + executorService.submit(StatusLogger::log); + executorService.submit(StatusLogger::log); + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.SECONDS); + } + + private void verifyOnlySingleStatusWasAppendedConcurrently(List<ILoggingEvent> events) + { + Map<String, List<ILoggingEvent>> eventsByThread = events.stream().collect(groupingBy(ILoggingEvent::getThreadName)); + List<String> threadNames = newArrayList(eventsByThread.keySet()); + + assertEquals("Expected events from 2 threads only", 2, threadNames.size()); + + List<ILoggingEvent> firstThreadEvents = eventsByThread.get(threadNames.get(0)); + List<ILoggingEvent> secondThreadEvents = eventsByThread.get(threadNames.get(1)); + + assertTrue("Expected at least one event from the first thread", firstThreadEvents.size() >= 1); + assertTrue("Expected at least one event from the second thread", secondThreadEvents.size() >= 1); + + if (areDisjunctive(firstThreadEvents, secondThreadEvents)) + { + log.debug("Event time ranges are disjunctive - log invocations were made one after another"); + } + else + { + verifyStatusWasPrintedAndBusyEventOccured(firstThreadEvents, secondThreadEvents); + } + } + + private boolean areDisjunctive(List<ILoggingEvent> firstThreadEvents, List<ILoggingEvent> secondThreadEvents) + { + Range<Long> firstThreadTimeRange = timestampsRange(firstThreadEvents); + Range<Long> secondThreadTimeRange = timestampsRange(secondThreadEvents); + boolean connected = firstThreadTimeRange.isConnected(secondThreadTimeRange); + boolean disjunctive = !connected || firstThreadTimeRange.intersection(secondThreadTimeRange).isEmpty(); + log.debug("Time ranges {}, {}, disjunctive={}", firstThreadTimeRange, secondThreadTimeRange, disjunctive); + return disjunctive; + } + + private Range<Long> timestampsRange(List<ILoggingEvent> events) + { + List<Long> timestamps = events.stream().map(ILoggingEvent::getTimeStamp).collect(Collectors.toList()); + Long min = timestamps.stream().min(Comparator.naturalOrder()).get(); + Long max = timestamps.stream().max(Comparator.naturalOrder()).get(); + // It's open on one side to cover a case when second status starts printing at the same timestamp that previous one was finished + return Range.closedOpen(min, max); + } + + private void verifyStatusWasPrintedAndBusyEventOccured(List<ILoggingEvent> firstThreadEvents, List<ILoggingEvent> secondThreadEvents) + { + if (firstThreadEvents.size() > 1 && secondThreadEvents.size() > 1) + { + log.error("Both event lists contain more than one entry. First = {}, Second = {}", firstThreadEvents, secondThreadEvents); + fail("More that one status log was appended concurrently"); + } + else if (firstThreadEvents.size() <= 1 && secondThreadEvents.size() <= 1) + { + log.error("No status log was recorded. First = {}, Second = {}", firstThreadEvents, secondThreadEvents); + fail("Status log was not appended"); + } + else + { + log.info("Checking if logger was busy. First = {}, Second = {}", firstThreadEvents, secondThreadEvents); + assertTrue("One 'logger busy' entry was expected", + isLoggerBusyTheOnlyEvent(firstThreadEvents) || isLoggerBusyTheOnlyEvent(secondThreadEvents)); + } + } + + private boolean isLoggerBusyTheOnlyEvent(List<ILoggingEvent> events) + { + return events.size() == 1 && + events.get(0).getMessage().equals("StatusLogger is busy") && + events.get(0).getLevel() == Level.TRACE; + } + + private static class InMemoryAppender extends AppenderBase<ILoggingEvent> + { + private final List<ILoggingEvent> events = newArrayList(); + + private InMemoryAppender() + { + start(); + } + + @Override + protected synchronized void append(ILoggingEvent event) + { + events.add(event); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org