ARTEMIS-1294 Using older sleep on TimedBuffer And also adding test
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ad372ec9 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ad372ec9 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ad372ec9 Branch: refs/heads/master Commit: ad372ec98e0d8e5765eef56883ea29b10768c70e Parents: 41a03de Author: Clebert Suconic <[email protected]> Authored: Tue Jul 18 10:03:47 2017 -0400 Committer: Clebert Suconic <[email protected]> Committed: Tue Jul 18 16:01:51 2017 -0400 ---------------------------------------------------------------------- .../artemis/core/io/buffer/TimedBuffer.java | 26 +++++- .../unit/core/journal/impl/TimedBufferTest.java | 94 ++++++++++++++++++++ 2 files changed, 117 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ad372ec9/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java index 2713255..087453d 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java @@ -23,7 +23,6 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.LockSupport; import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -48,7 +47,7 @@ public final class TimedBuffer { // prevent that private final Semaphore spinLimiter = new Semaphore(1); - private CheckTimer timerRunnable = null; + private CheckTimer timerRunnable; private final int bufferSize; @@ -371,6 +370,9 @@ public final class TimedBuffer { int failedChecks = 0; long timeBefore = 0; + final int sleepMillis = timeout / 1000000; // truncates + final int sleepNanos = timeout % 1000000; + @Override public void run() { long lastFlushTime = 0; @@ -419,7 +421,14 @@ public final class TimedBuffer { timeBefore = System.nanoTime(); } - LockSupport.parkNanos(timeout); + try { + sleep(sleepMillis, sleepNanos); + } catch (InterruptedException e) { + throw new ActiveMQInterruptedException(e); + } catch (Exception e) { + useSleep = false; + ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e); + } if (checks < MAX_CHECKS_ON_SLEEP) { long realTimeSleep = System.nanoTime() - timeBefore; @@ -445,6 +454,17 @@ public final class TimedBuffer { } /** + * Sub classes (tests basically) can use this to override how the sleep is being done + * + * @param sleepMillis + * @param sleepNanos + * @throws InterruptedException + */ + protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException { + Thread.sleep(sleepMillis, sleepNanos); + } + + /** * Sub classes (tests basically) can use this to override disabling spinning */ protected void stopSpin() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ad372ec9/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java index b2f65cd..bddb7ea 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -27,6 +28,7 @@ import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.ReusableLatch; import org.junit.Assert; import org.junit.Test; @@ -122,6 +124,98 @@ public class TimedBufferTest extends ActiveMQTestBase { } } + @Test + public void testTimeOnTimedBuffer() throws Exception { + final ReusableLatch latchFlushed = new ReusableLatch(0); + final AtomicInteger flushes = new AtomicInteger(0); + class TestObserver implements TimedBufferObserver { + + @Override + public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) { + for (IOCallback callback : callbacks) { + callback.done(); + } + } + + /* (non-Javadoc) + * @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int) + */ + @Override + public ByteBuffer newBuffer(final int minSize, final int maxSize) { + return ByteBuffer.allocate(maxSize); + } + + @Override + public int getRemainingBytes() { + return 1024 * 1024; + } + } + + TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 2, false); + + timedBuffer.start(); + + TestObserver observer = new TestObserver(); + timedBuffer.setObserver(observer); + + + int x = 0; + + byte[] bytes = new byte[10]; + for (int j = 0; j < 10; j++) { + bytes[j] = ActiveMQTestBase.getSamplebyte(x++); + } + + ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes); + + IOCallback callback = new IOCallback() { + @Override + public void done() { + System.out.println("done"); + latchFlushed.countDown(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + + } + }; + + + try { + latchFlushed.setCount(2); + + // simulating a low load period + timedBuffer.addBytes(buff, true, callback); + Thread.sleep(1000); + timedBuffer.addBytes(buff, true, callback); + Assert.assertTrue(latchFlushed.await(5, TimeUnit.SECONDS)); + latchFlushed.setCount(5); + + + flushes.set(0); + + // Sending like crazy... still some wait (1 millisecond) between each send.. + long time = System.currentTimeMillis(); + for (int i = 0; i < 5; i++) { + timedBuffer.addBytes(buff, true, callback); + Thread.sleep(1); + } + Assert.assertTrue(latchFlushed.await(5, TimeUnit.SECONDS)); + + // The purpose of the timed buffer is to batch writes up to a millisecond.. or up to the size of the buffer. + Assert.assertTrue("Timed Buffer is not batching accordingly, it was expected to take at least 500 seconds batching multiple writes while it took " + (System.currentTimeMillis() - time) + " milliseconds", System.currentTimeMillis() - time >= 500); + + // it should be in fact only writing once.. + // i will set for 3 just in case there's a GC or anything else happening on the test + Assert.assertTrue("Too many writes were called", flushes.get() <= 3); + } finally { + timedBuffer.stop(); + } + + + + } @Test public void testTimingAndFlush() throws Exception {
