Repository: activemq-artemis Updated Branches: refs/heads/master 661f695ee -> 8182f8bd2
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/asyncio/AsynchronousFileTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/asyncio/AsynchronousFileTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/asyncio/AsynchronousFileTest.java deleted file mode 100644 index 434dcc8..0000000 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/asyncio/AsynchronousFileTest.java +++ /dev/null @@ -1,1015 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.unit.core.asyncio; - -import java.io.BufferedInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.lang.ref.WeakReference; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharsetEncoder; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.core.asyncio.AIOCallback; -import org.apache.activemq.artemis.core.asyncio.BufferCallback; -import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl; -import org.apache.activemq.artemis.core.journal.impl.AIOSequentialFileFactory; -import org.apache.activemq.artemis.tests.unit.UnitTestLogger; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM - * If you are running this test in eclipse you should do: - * I - Run->Open Run Dialog - * II - Find the class on the list (you will find it if you already tried running this testcase before) - * III - Add -Djava.library.path=<your project place>/native/src/.libs - */ -public class AsynchronousFileTest extends AIOTestBase -{ - - @BeforeClass - public static void hasAIO() - { - org.junit.Assume.assumeTrue("Test case needs AIO to run", AIOSequentialFileFactory.isSupported()); - } - - private static CharsetEncoder UTF_8_ENCODER = StandardCharsets.UTF_8.newEncoder(); - - byte[] commonBuffer = null; - - ExecutorService executor; - - ExecutorService pollerExecutor; - - private AsynchronousFileImpl controller; - private ByteBuffer buffer; - - private final BufferCallback bufferCallbackDestroy = new BufferCallback() - { - - public void bufferDone(final ByteBuffer buffer1) - { - AsynchronousFileImpl.destroyBuffer(buffer1); - } - - }; - - private static void debug(final String msg) - { - UnitTestLogger.LOGGER.debug(msg); - } - - @Override - @Before - public void setUp() throws Exception - { - super.setUp(); - pollerExecutor = Executors.newCachedThreadPool(new ActiveMQThreadFactory("ActiveMQ-AIO-poller-pool" + System.identityHashCode(this), - false, - this.getClass().getClassLoader())); - executor = Executors.newSingleThreadExecutor(); - } - - @Override - @After - public void tearDown() throws Exception - { - destroy(buffer); - if (controller != null) - { - try - { - controller.close(); - } - catch (Exception e) - { - // ignored - } - } - executor.shutdown(); - pollerExecutor.shutdown(); - super.tearDown(); - } - - /** - * Opening and closing a file immediately can lead to races on the native layer, - * creating crash conditions. - */ - @Test - public void testOpenClose() throws Exception - { - controller = new AsynchronousFileImpl(executor, pollerExecutor); - for (int i = 0; i < 100; i++) - { - controller.open(fileName, 10000); - controller.close(); - - } - } - - @Test - public void testReleaseBuffers() throws Exception - { - controller = new AsynchronousFileImpl(executor, pollerExecutor); - WeakReference<ByteBuffer> bufferCheck = null; - - controller.open(fileName, 10000); - bufferCheck = new WeakReference<ByteBuffer>(controller.getHandler()); - controller.fill(0, 10, 1024, (byte) 0); - - ByteBuffer write = AsynchronousFileImpl.newBuffer(1024); - - for (int i = 0; i < 1024; i++) - { - write.put(ActiveMQTestBase.getSamplebyte(i)); - } - - final CountDownLatch latch = new CountDownLatch(1); - - controller.write(0, 1024, write, new AIOCallback() - { - - public void onError(final int errorCode, final String errorMessage) - { - } - - public void done() - { - latch.countDown(); - } - }); - - assertTrue(latch.await(10, TimeUnit.SECONDS)); - - WeakReference<ByteBuffer> bufferCheck2 = new WeakReference<ByteBuffer>(write); - - destroy(write); - - write = null; - - ActiveMQTestBase.forceGC(bufferCheck2, 5000); - - assertNull(bufferCheck2.get()); - - controller.close(); - - controller = null; - - ActiveMQTestBase.forceGC(bufferCheck, 5000); - - assertNull(bufferCheck.get()); - } - - @Test - public void testFileNonExistent() throws Exception - { - controller = new AsynchronousFileImpl(executor, pollerExecutor); - for (int i = 0; i < 100; i++) - { - try - { - controller.open("/non-existent/IDontExist.error", 10000); - fail("Exception expected! The test could create a file called /non-existent/IDontExist.error when it was supposed to fail."); - } - catch (Exception ignored) - { - } - try - { - controller.close(); - fail("Supposed to throw exception as the file wasn't opened"); - } - catch (Exception ignored) - { - } - - } - } - - /** - * This test is validating if the AIO layer can open two different - * simultaneous files without loose any callbacks. This test made the native - * layer to crash at some point during development - */ - @Test - public void testTwoFiles() throws Exception - { - controller = new AsynchronousFileImpl(executor, pollerExecutor); - final AsynchronousFileImpl controller2 = new AsynchronousFileImpl(executor, pollerExecutor); - controller.open(fileName + ".1", 10000); - controller2.open(fileName + ".2", 10000); - - int numberOfLines = 1000; - int size = 1024; - - ArrayList<Integer> listResult1 = new ArrayList<Integer>(); - ArrayList<Integer> listResult2 = new ArrayList<Integer>(); - - AtomicInteger errors = new AtomicInteger(0); - - - try - { - CountDownLatch latchDone = new CountDownLatch(numberOfLines); - CountDownLatch latchDone2 = new CountDownLatch(numberOfLines); - - buffer = AsynchronousFileImpl.newBuffer(size); - encodeBufer(buffer); - - preAlloc(controller, numberOfLines * size); - preAlloc(controller2, numberOfLines * size); - - ArrayList<CountDownCallback> list = new ArrayList<CountDownCallback>(); - ArrayList<CountDownCallback> list2 = new ArrayList<CountDownCallback>(); - - for (int i = 0; i < numberOfLines; i++) - { - list.add(new CountDownCallback(latchDone, errors, listResult1, i)); - list2.add(new CountDownCallback(latchDone2, errors, listResult2, i)); - } - - int counter = 0; - - Iterator<CountDownCallback> iter2 = list2.iterator(); - - for (CountDownCallback cb1 : list) - { - CountDownCallback cb2 = iter2.next(); - - controller.write(counter * size, size, buffer, cb1); - controller2.write(counter * size, size, buffer, cb2); - ++counter; - - } - - ActiveMQTestBase.waitForLatch(latchDone); - ActiveMQTestBase.waitForLatch(latchDone2); - - CountDownCallback.checkResults(numberOfLines, listResult1); - CountDownCallback.checkResults(numberOfLines, listResult2); - - for (CountDownCallback callback : list) - { - assertEquals(1, callback.timesDoneCalled.get()); - assertTrue(callback.doneCalled); - } - - for (CountDownCallback callback : list2) - { - assertEquals(1, callback.timesDoneCalled.get()); - assertTrue(callback.doneCalled); - } - - assertEquals(0, errors.get()); - - controller.close(); - } - finally - { - try - { - controller2.close(); - } - catch (Exception ignored) - { - } - } - } - - @Test - public void testAddBeyongSimultaneousLimit() throws Exception - { - asyncData(3000, 1024, 10); - } - - @Test - public void testAddAsyncData() throws Exception - { - asyncData(10000, 1024, 30000); - } - - private static final class LocalCallback implements AIOCallback - { - private final CountDownLatch latch = new CountDownLatch(1); - - volatile boolean error; - - public void done() - { - latch.countDown(); - } - - public void onError(final int errorCode, final String errorMessage) - { - error = true; - latch.countDown(); - } - } - - @Test - public void testReleaseNullBuffer() throws Exception - { - boolean failed = false; - try - { - AsynchronousFileImpl.destroyBuffer(null); - } - catch (Exception expected) - { - failed = true; - } - - assertTrue("Exception expected", failed); - } - - @Test - public void testInvalidReads() throws Exception - { - controller = new AsynchronousFileImpl(executor, pollerExecutor); - - final int SIZE = 512; - - controller.open(fileName, 10); - controller.close(); - - controller = new AsynchronousFileImpl(executor, pollerExecutor); - - controller.open(fileName, 10); - - controller.fill(0, 1, 512, (byte) 'j'); - - buffer = AsynchronousFileImpl.newBuffer(SIZE); - - buffer.clear(); - - for (int i = 0; i < SIZE; i++) - { - buffer.put((byte) (i % 100)); - } - - LocalCallback callbackLocal = new LocalCallback(); - - controller.write(0, 512, buffer, callbackLocal); - - waitForLatch(callbackLocal.latch); - - { - ByteBuffer newBuffer = AsynchronousFileImpl.newBuffer(512); - - try - { - callbackLocal = new LocalCallback(); - - controller.read(0, 50, newBuffer, callbackLocal); - - waitForLatch(callbackLocal.latch); - - assertTrue(callbackLocal.error); - - } - finally - { - // We have to destroy the native buffer manually as it was created with a malloc like C function - destroy(newBuffer); - newBuffer = null; - } - } - callbackLocal = new LocalCallback(); - - byte[] bytes = new byte[512]; - - { - try - { - ByteBuffer newBuffer = ByteBuffer.wrap(bytes); - - controller.read(0, 512, newBuffer, callbackLocal); - - fail("An exception was supposed to be thrown"); - } - catch (ActiveMQException ignored) - { - System.out.println(ignored); - } - } - - { - final ByteBuffer newBuffer = AsynchronousFileImpl.newBuffer(512); - try - { - callbackLocal = new LocalCallback(); - controller.read(0, 512, newBuffer, callbackLocal); - waitForLatch(callbackLocal.latch); - assertFalse(callbackLocal.error); - - newBuffer.rewind(); - - byte[] bytesRead = new byte[SIZE]; - - newBuffer.get(bytesRead); - - for (int i = 0; i < SIZE; i++) - { - assertEquals((byte) (i % 100), bytesRead[i]); - } - } - finally - { - destroy(newBuffer); - } - } - } - - private static void destroy(ByteBuffer buffer0) - { - if (buffer0 != null) - { - AsynchronousFileImpl.destroyBuffer(buffer0); - } - } - - @Test - public void testBufferCallbackUniqueBuffers() throws Exception - { - controller = new AsynchronousFileImpl(executor, pollerExecutor); - final int NUMBER_LINES = 1000; - final int SIZE = 512; - - controller.open(fileName, 1000); - - controller.fill(0, 1, NUMBER_LINES * SIZE, (byte) 'j'); - - final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>(); - - BufferCallback bufferCallback = new BufferCallback() - { - public void bufferDone(final ByteBuffer buffer) - { - buffers.add(buffer); - } - }; - - controller.setBufferCallback(bufferCallback); - - CountDownLatch latch = new CountDownLatch(NUMBER_LINES); - ArrayList<Integer> result = new ArrayList<Integer>(); - for (int i = 0; i < NUMBER_LINES; i++) - { - ByteBuffer buffer1 = AsynchronousFileImpl.newBuffer(SIZE); - buffer1.rewind(); - for (int j = 0; j < SIZE; j++) - { - buffer1.put((byte) (j % Byte.MAX_VALUE)); - } - CountDownCallback aio = new CountDownCallback(latch, null, result, i); - controller.write(i * SIZE, SIZE, buffer1, aio); - } - - // The buffer callback is only called after the complete callback was - // called. - // Because of that a race could happen on the assertions to - // buffers.size what would invalidate the test - // We close the file and that would guarantee the buffer callback was - // called for all the elements - controller.close(); - - CountDownCallback.checkResults(NUMBER_LINES, result); - - // Make sure all the buffers are unique - ByteBuffer lineOne = null; - for (ByteBuffer bufferTmp : buffers) - { - if (lineOne == null) - { - lineOne = bufferTmp; - } - else - { - assertTrue(lineOne != bufferTmp); - } - } - - for (ByteBuffer bufferTmp : buffers) - { - destroy(bufferTmp); - } - - buffers.clear(); - } - - @Test - public void testBufferCallbackAwaysSameBuffer() throws Exception - { - - controller = new AsynchronousFileImpl(executor, pollerExecutor); - - final int NUMBER_LINES = 1000; - final int SIZE = 512; - - controller.open(fileName, 1000); - - controller.fill(0, 1, NUMBER_LINES * SIZE, (byte) 'j'); - - final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>(); - - BufferCallback bufferCallback = new BufferCallback() - { - public void bufferDone(final ByteBuffer buffer) - { - buffers.add(buffer); - } - }; - - controller.setBufferCallback(bufferCallback); - - CountDownLatch latch = new CountDownLatch(NUMBER_LINES); - - buffer = AsynchronousFileImpl.newBuffer(SIZE); - buffer.rewind(); - for (int j = 0; j < SIZE; j++) - { - buffer.put((byte) (j % Byte.MAX_VALUE)); - } - - ArrayList<Integer> result = new ArrayList<Integer>(); - - for (int i = 0; i < NUMBER_LINES; i++) - { - CountDownCallback aio = new CountDownCallback(latch, null, result, i); - controller.write(i * SIZE, SIZE, buffer, aio); - } - - // The buffer callback is only called after the complete callback was - // called. - // Because of that a race could happen on the assertions to - // buffers.size what would invalidate the test - // We close the file and that would guarantee the buffer callback was - // called for all the elements - controller.close(); - - CountDownCallback.checkResults(NUMBER_LINES, result); - - assertEquals(NUMBER_LINES, buffers.size()); - - // Make sure all the buffers are unique - ByteBuffer lineOne = null; - for (ByteBuffer bufferTmp : buffers) - { - if (lineOne == null) - { - lineOne = bufferTmp; - } - else - { - assertTrue(lineOne == bufferTmp); - } - } - - buffers.clear(); - } - - @Test - public void testRead() throws Exception - { - controller = new AsynchronousFileImpl(executor, pollerExecutor); - controller.setBufferCallback(bufferCallbackDestroy); - - final int NUMBER_LINES = 1000; - final int SIZE = 1024; - - controller.open(fileName, 1000); - - controller.fill(0, 1, NUMBER_LINES * SIZE, (byte) 'j'); - - { - CountDownLatch latch = new CountDownLatch(NUMBER_LINES); - ArrayList<Integer> result = new ArrayList<Integer>(); - - AtomicInteger errors = new AtomicInteger(0); - - for (int i = 0; i < NUMBER_LINES; i++) - { - if (i % 100 == 0) - { - System.out.println("Wrote " + i + " lines"); - } - final ByteBuffer buffer0 = AsynchronousFileImpl.newBuffer(SIZE); - for (int j = 0; j < SIZE; j++) - { - buffer0.put(ActiveMQTestBase.getSamplebyte(j)); - } - - CountDownCallback aio = new CountDownCallback(latch, errors, result, i); - controller.write(i * SIZE, SIZE, buffer0, aio); - } - - waitForLatch(latch); - - assertEquals(0, errors.get()); - - CountDownCallback.checkResults(NUMBER_LINES, result); - } - - // If you call close you're supposed to wait events to finish before - // closing it - controller.close(); - controller.setBufferCallback(null); - - controller.open(fileName, 10); - - buffer = AsynchronousFileImpl.newBuffer(SIZE); - - for (int i = 0; i < NUMBER_LINES; i++) - { - if (i % 100 == 0) - { - System.out.println("Read " + i + " lines"); - } - AsynchronousFileImpl.clearBuffer(buffer); - - CountDownLatch latch = new CountDownLatch(1); - AtomicInteger errors = new AtomicInteger(0); - CountDownCallback aio = new CountDownCallback(latch, errors, null, 0); - - controller.read(i * SIZE, SIZE, buffer, aio); - - waitForLatch(latch); - assertEquals(0, errors.get()); - assertTrue(aio.doneCalled); - - byte[] bytesRead = new byte[SIZE]; - buffer.get(bytesRead); - - for (int count = 0; count < SIZE; count++) - { - Assert.assertEquals("byte position " + count + " differs on line " + i + " position = " + count, - ActiveMQTestBase.getSamplebyte(count), - bytesRead[count]); - } - } - } - - /** - * This test will call file.close() when there are still callbacks being processed. - * This could cause a crash or callbacks missing and this test is validating both situations. - * The file is also read after being written to validate its correctness - */ - @Test - public void testConcurrentClose() throws Exception - { - controller = new AsynchronousFileImpl(executor, pollerExecutor); - final int NUMBER_LINES = 1000; - CountDownLatch readLatch = new CountDownLatch(NUMBER_LINES); - final int SIZE = 1024; - - controller.open(fileName, 10000); - - controller.fill(0, 1, NUMBER_LINES * SIZE, (byte) 'j'); - - controller.setBufferCallback(bufferCallbackDestroy); - - for (int i = 0; i < NUMBER_LINES; i++) - { - ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE); - - buffer.clear(); - addString("Str value " + i + "\n", buffer); - for (int j = buffer.position(); j < buffer.capacity() - 1; j++) - { - buffer.put((byte) ' '); - } - buffer.put((byte) '\n'); - - CountDownCallback aio = new CountDownCallback(readLatch, null, null, 0); - controller.write(i * SIZE, SIZE, buffer, aio); - } - - // If you call close you're supposed to wait events to finish before - // closing it - controller.close(); - - controller.setBufferCallback(null); - - assertEquals(0, readLatch.getCount()); - waitForLatch(readLatch); - controller.open(fileName, 10); - - ByteBuffer newBuffer = AsynchronousFileImpl.newBuffer(SIZE); - - ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE); - - for (int i = 0; i < NUMBER_LINES; i++) - { - newBuffer.clear(); - addString("Str value " + i + "\n", newBuffer); - for (int j = newBuffer.position(); j < newBuffer.capacity() - 1; j++) - { - newBuffer.put((byte) ' '); - } - newBuffer.put((byte) '\n'); - - CountDownLatch latch = new CountDownLatch(1); - CountDownCallback aio = new CountDownCallback(latch, null, null, 0); - controller.read(i * SIZE, SIZE, buffer, aio); - waitForLatch(latch); - assertEquals(0, aio.errorCalled); - assertTrue(aio.doneCalled); - - byte[] bytesRead = new byte[SIZE]; - byte[] bytesCompare = new byte[SIZE]; - - newBuffer.rewind(); - newBuffer.get(bytesCompare); - buffer.rewind(); - buffer.get(bytesRead); - - for (int count = 0; count < SIZE; count++) - { - assertEquals("byte position " + count + " differs on line " + i, - bytesCompare[count], - bytesRead[count]); - } - - assertTrue(buffer.equals(newBuffer)); - } - - destroy(newBuffer); - } - - private void asyncData(final int numberOfLines, final int size, final int aioLimit) throws Exception - { - controller = new AsynchronousFileImpl(executor, pollerExecutor); - controller.open(fileName, aioLimit); - - - CountDownLatch latchDone = new CountDownLatch(numberOfLines); - - buffer = AsynchronousFileImpl.newBuffer(size); - encodeBufer(buffer); - - preAlloc(controller, numberOfLines * size); - - ArrayList<CountDownCallback> list = new ArrayList<CountDownCallback>(); - - ArrayList<Integer> result = new ArrayList<Integer>(); - - for (int i = 0; i < numberOfLines; i++) - { - list.add(new CountDownCallback(latchDone, null, result, i)); - } - - long valueInitial = System.currentTimeMillis(); - - long lastTime = System.currentTimeMillis(); - int counter = 0; - for (CountDownCallback tmp : list) - { - controller.write(counter * size, size, buffer, tmp); - if (++counter % 20000 == 0) - { - AsynchronousFileTest.debug(20000 * 1000 / (System.currentTimeMillis() - lastTime) + " rec/sec (Async)"); - lastTime = System.currentTimeMillis(); - } - - } - - ActiveMQTestBase.waitForLatch(latchDone); - - long timeTotal = System.currentTimeMillis() - valueInitial; - - CountDownCallback.checkResults(numberOfLines, result); - - AsynchronousFileTest.debug("After completions time = " + timeTotal + - " for " + - numberOfLines + - " registers " + - " size each line = " + - size + - ", Records/Sec=" + - numberOfLines * - 1000 / - timeTotal + - " (Assynchronous)"); - - for (CountDownCallback tmp : list) - { - assertEquals(1, tmp.timesDoneCalled.get()); - assertTrue(tmp.doneCalled); - assertEquals(0, tmp.errorCalled); - } - - controller.close(); - } - - @Test - public void testDirectSynchronous() throws Exception - { - - final int NUMBER_LINES = 3000; - final int SIZE = 1024; - - controller = new AsynchronousFileImpl(executor, pollerExecutor); - controller.open(fileName, 2000); - - buffer = AsynchronousFileImpl.newBuffer(SIZE); - encodeBufer(buffer); - - preAlloc(controller, NUMBER_LINES * SIZE); - - long startTime = System.currentTimeMillis(); - - for (int i = 0; i < NUMBER_LINES; i++) - { - CountDownLatch latchDone = new CountDownLatch(1); - CountDownCallback aioBlock = new CountDownCallback(latchDone, null, null, 0); - controller.write(i * 512, 512, buffer, aioBlock); - ActiveMQTestBase.waitForLatch(latchDone); - assertTrue(aioBlock.doneCalled); - assertEquals(0, aioBlock.errorCalled); - } - - long timeTotal = System.currentTimeMillis() - startTime; - AsynchronousFileTest.debug("time = " + timeTotal + - " for " + - NUMBER_LINES + - " registers " + - " size each line = " + - SIZE + - " Records/Sec=" + - NUMBER_LINES * - 1000 / - timeTotal + - " Synchronous"); - - controller.close(); - } - - - @Test - public void testInternalWrite() throws Exception - { - controller = new AsynchronousFileImpl(executor, pollerExecutor); - controller.open(fileName, 2000); - - final int SIZE = 10 * 512; - - buffer = AsynchronousFileImpl.newBuffer(SIZE); - - for (int i = 0; i < SIZE; i++) - { - buffer.put(getSamplebyte(i)); - } - - controller.writeInternal(0, SIZE, buffer); - - InputStream fileInput = new BufferedInputStream(new FileInputStream(new File(fileName))); - - for (int i = 0; i < SIZE; i++) - { - assertEquals(getSamplebyte(i), fileInput.read()); - } - - assertEquals(-1, fileInput.read()); - - } - - - @Test - public void testInvalidWrite() throws Exception - { - controller = new AsynchronousFileImpl(executor, pollerExecutor); - controller.open(fileName, 2000); - - final int SIZE = 512; - - buffer = AsynchronousFileImpl.newBuffer(SIZE); - encodeBufer(buffer); - - preAlloc(controller, 10 * 512); - - CountDownLatch latchDone = new CountDownLatch(1); - - CountDownCallback aioBlock = new CountDownCallback(latchDone, null, null, 0); - controller.write(11, 512, buffer, aioBlock); - - ActiveMQTestBase.waitForLatch(latchDone); - - assertTrue(aioBlock.errorCalled != 0); - assertFalse(aioBlock.doneCalled); - } - - @Test - public void testInvalidAlloc() throws Exception - { - try - { - @SuppressWarnings("unused") - ByteBuffer buffer = AsynchronousFileImpl.newBuffer(300); - fail("Exception expected"); - } - catch (Exception ignored) - { - } - - } - - // This is in particular testing for http://bugs.sun.com/view_bug.do?bug_id=6791815 - @Test - public void testAllocations() throws Exception - { - final AtomicInteger errors = new AtomicInteger(0); - - Thread[] ts = new Thread[100]; - - final CountDownLatch align = new CountDownLatch(ts.length); - final CountDownLatch start = new CountDownLatch(1); - - for (int i = 0; i < ts.length; i++) - { - ts[i] = new Thread() - { - @Override - public void run() - { - try - { - align.countDown(); - start.await(); - for (int j = 0; j < 1000; j++) - { - ByteBuffer buffer = AsynchronousFileImpl.newBuffer(512); - AsynchronousFileTest.destroy(buffer); - } - } - catch (Throwable e) - { - e.printStackTrace(); - errors.incrementAndGet(); - } - } - }; - ts[i].start(); - } - - align.await(); - start.countDown(); - - for (Thread t : ts) - { - t.join(); - } - - assertEquals(0, errors.get()); - } - - @Test - public void testSize() throws Exception - { - controller = new AsynchronousFileImpl(executor, pollerExecutor); - - final int NUMBER_LINES = 10; - final int SIZE = 1024; - - controller.open(fileName, 1); - - controller.fill(0, 1, NUMBER_LINES * SIZE, (byte) 'j'); - - assertEquals(NUMBER_LINES * SIZE, controller.size()); - } - - private static void addString(final String str, final ByteBuffer buffer) - { - CharBuffer charBuffer = CharBuffer.wrap(str); - AsynchronousFileTest.UTF_8_ENCODER.encode(charBuffer, buffer, true); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java index aa8422a..c9d3ce4 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java @@ -16,10 +16,10 @@ */ package org.apache.activemq.artemis.tests.unit.core.asyncio; -import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; -import org.apache.activemq.artemis.core.asyncio.AIOCallback; -import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl; -import org.apache.activemq.artemis.core.journal.impl.AIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.aio.AIOSequentialFile; +import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; +import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.tests.unit.UnitTestLogger; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; @@ -55,7 +55,7 @@ public class MultiThreadAsynchronousFileTest extends AIOTestBase static final int SIZE = 1024; - static final int NUMBER_OF_THREADS = 10; + static final int NUMBER_OF_THREADS = 1; static final int NUMBER_OF_LINES = 1000; @@ -65,7 +65,7 @@ public class MultiThreadAsynchronousFileTest extends AIOTestBase private static void debug(final String msg) { - UnitTestLogger.LOGGER.debug(msg); + UnitTestLogger.LOGGER.info(msg); } @Override @@ -102,16 +102,18 @@ public class MultiThreadAsynchronousFileTest extends AIOTestBase private void executeTest(final boolean sync) throws Throwable { MultiThreadAsynchronousFileTest.debug(sync ? "Sync test:" : "Async test"); - AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl(executor, pollerExecutor); - jlibAIO.open(fileName, 21000); + AIOSequentialFileFactory factory = new AIOSequentialFileFactory(getTestDirfile(), 21000); + factory.start(); + factory.disableBufferReuse(); + + AIOSequentialFile file = (AIOSequentialFile)factory.createSequentialFile(fileName); + file.open(); try { MultiThreadAsynchronousFileTest.debug("Preallocating file"); - jlibAIO.fill(0L, - MultiThreadAsynchronousFileTest.NUMBER_OF_THREADS, - MultiThreadAsynchronousFileTest.SIZE * MultiThreadAsynchronousFileTest.NUMBER_OF_LINES, - (byte) 0); + file.fill(MultiThreadAsynchronousFileTest.NUMBER_OF_THREADS * + MultiThreadAsynchronousFileTest.SIZE * MultiThreadAsynchronousFileTest.NUMBER_OF_LINES); MultiThreadAsynchronousFileTest.debug("Done Preallocating file"); CountDownLatch latchStart = new CountDownLatch(MultiThreadAsynchronousFileTest.NUMBER_OF_THREADS + 1); @@ -119,7 +121,7 @@ public class MultiThreadAsynchronousFileTest extends AIOTestBase ArrayList<ThreadProducer> list = new ArrayList<ThreadProducer>(MultiThreadAsynchronousFileTest.NUMBER_OF_THREADS); for (int i = 0; i < MultiThreadAsynchronousFileTest.NUMBER_OF_THREADS; i++) { - ThreadProducer producer = new ThreadProducer("Thread " + i, latchStart, jlibAIO, sync); + ThreadProducer producer = new ThreadProducer("Thread " + i, latchStart, file, sync); list.add(producer); producer.start(); } @@ -152,7 +154,8 @@ public class MultiThreadAsynchronousFileTest extends AIOTestBase } finally { - jlibAIO.close(); + file.close(); + factory.stop(); } } @@ -170,11 +173,11 @@ public class MultiThreadAsynchronousFileTest extends AIOTestBase boolean sync; - AsynchronousFileImpl libaio; + AIOSequentialFile libaio; public ThreadProducer(final String name, final CountDownLatch latchStart, - final AsynchronousFileImpl libaio, + final AIOSequentialFile libaio, final boolean sync) { super(name); @@ -190,10 +193,7 @@ public class MultiThreadAsynchronousFileTest extends AIOTestBase ByteBuffer buffer = null; - synchronized (MultiThreadAsynchronousFileTest.class) - { - buffer = AsynchronousFileImpl.newBuffer(MultiThreadAsynchronousFileTest.SIZE); - } + buffer = LibaioContext.newAlignedBuffer(MultiThreadAsynchronousFileTest.SIZE, 512); try { @@ -268,7 +268,7 @@ public class MultiThreadAsynchronousFileTest extends AIOTestBase { synchronized (MultiThreadAsynchronousFileTest.class) { - AsynchronousFileImpl.destroyBuffer(buffer); + LibaioContext.freeBuffer(buffer); } } @@ -281,44 +281,9 @@ public class MultiThreadAsynchronousFileTest extends AIOTestBase buffer.put(bytes); } - private void addData(final AsynchronousFileImpl aio, final ByteBuffer buffer, final AIOCallback callback) throws Exception - { - executor.execute(new WriteRunnable(aio, buffer, callback)); - } - - private class WriteRunnable implements Runnable + private void addData(final AIOSequentialFile aio, final ByteBuffer buffer, final IOCallback callback) throws Exception { - - AsynchronousFileImpl aio; - - ByteBuffer buffer; - - AIOCallback callback; - - public WriteRunnable(final AsynchronousFileImpl aio, final ByteBuffer buffer, final AIOCallback callback) - { - this.aio = aio; - this.buffer = buffer; - this.callback = callback; - } - - public void run() - { - try - { - aio.write(getNewPosition() * MultiThreadAsynchronousFileTest.SIZE, - MultiThreadAsynchronousFileTest.SIZE, - buffer, - callback); - - } - catch (Exception e) - { - callback.onError(ActiveMQExceptionType.GENERIC_EXCEPTION.getCode(), e.toString()); - e.printStackTrace(); - } - } - + aio.writeDirect(buffer, true, callback); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java index aa5c68b..e08ef55 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java @@ -37,8 +37,8 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.tests.unit.UnitTestLogger; @@ -100,7 +100,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase FakeSequentialFileFactory factory = new FakeSequentialFileFactory(200, true); - SequentialFile file = factory.createSequentialFile("test1", 1); + SequentialFile file = factory.createSequentialFile("test1"); file.open(); @@ -590,7 +590,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase System.out.println("Files = " + factory.listFiles("tt")); - SequentialFile file = factory.createSequentialFile("tt-1.tt", 1); + SequentialFile file = factory.createSequentialFile("tt-1.tt"); file.open(); @@ -656,7 +656,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase journalImpl.appendCommitRecord(2L, false); - SequentialFile file = factory.createSequentialFile("tt-1.tt", 1); + SequentialFile file = factory.createSequentialFile("tt-1.tt"); file.open(); @@ -761,7 +761,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase journalImpl.appendCommitRecord(1L, false); - SequentialFile file = factory.createSequentialFile("tt-1.tt", 1); + SequentialFile file = factory.createSequentialFile("tt-1.tt"); file.open(); @@ -1046,7 +1046,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase Assert.assertEquals(0, records.size()); Assert.assertEquals(1, transactions.size()); - SequentialFile file = factory.createSequentialFile("tt-1.tt", 1); + SequentialFile file = factory.createSequentialFile("tt-1.tt"); file.open(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/CleanBufferTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/CleanBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/CleanBufferTest.java index e078043..c2f88ac 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/CleanBufferTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/CleanBufferTest.java @@ -19,10 +19,10 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl; import java.io.File; import java.nio.ByteBuffer; -import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; -import org.apache.activemq.artemis.core.journal.impl.AIOSequentialFileFactory; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Assert; @@ -44,7 +44,7 @@ public class CleanBufferTest extends ActiveMQTestBase @Test public void testCleanOnNIO() { - SequentialFileFactory factory = new NIOSequentialFileFactory(new File("Whatever")); + SequentialFileFactory factory = new NIOSequentialFileFactory(new File("Whatever"), 1); testBuffer(factory); } @@ -52,9 +52,9 @@ public class CleanBufferTest extends ActiveMQTestBase @Test public void testCleanOnAIO() { - if (AsynchronousFileImpl.isLoaded()) + if (LibaioContext.isLoaded()) { - SequentialFileFactory factory = new AIOSequentialFileFactory(new File("Whatever")); + SequentialFileFactory factory = new AIOSequentialFileFactory(new File("Whatever"), 50); testBuffer(factory); } @@ -70,6 +70,7 @@ public class CleanBufferTest extends ActiveMQTestBase private void testBuffer(final SequentialFileFactory factory) { + factory.start(); ByteBuffer buffer = factory.newBuffer(100); try @@ -107,6 +108,7 @@ public class CleanBufferTest extends ActiveMQTestBase finally { factory.releaseBuffer(buffer); + factory.stop(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FakeJournalImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FakeJournalImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FakeJournalImplTest.java index aaffffa..eabb89e 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FakeJournalImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FakeJournalImplTest.java @@ -17,7 +17,7 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl; import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; public class FakeJournalImplTest extends JournalImplTestUnit { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FakeSequentialFileFactoryTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FakeSequentialFileFactoryTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FakeSequentialFileFactoryTest.java index 7431233..aee1e06 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FakeSequentialFileFactoryTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FakeSequentialFileFactoryTest.java @@ -17,7 +17,7 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl; import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; public class FakeSequentialFileFactoryTest extends SequentialFileFactoryTestBase { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FileFactoryTestBase.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FileFactoryTestBase.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FileFactoryTestBase.java index 8eacec1..73bf148 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FileFactoryTestBase.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/FileFactoryTestBase.java @@ -15,15 +15,13 @@ * limitations under the License. */ package org.apache.activemq.artemis.tests.unit.core.journal.impl; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.junit.Before; - import java.nio.ByteBuffer; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Assert; - -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.junit.Before; public abstract class FileFactoryTestBase extends ActiveMQTestBase { @@ -42,15 +40,15 @@ public abstract class FileFactoryTestBase extends ActiveMQTestBase // Protected --------------------------------- - protected void checkFill(final SequentialFile file, final int pos, final int size, final byte fillChar) throws Exception + protected void checkFill(final SequentialFile file, final int size) throws Exception { - file.fill(pos, size, fillChar); + file.fill(size); file.close(); file.open(); - file.position(pos); + file.position(0); ByteBuffer bb = ByteBuffer.allocateDirect(size); @@ -67,7 +65,7 @@ public abstract class FileFactoryTestBase extends ActiveMQTestBase for (int i = 0; i < size; i++) { // log.debug(" i is " + i); - Assert.assertEquals(fillChar, bytes[i]); + Assert.assertEquals(0, bytes[i]); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java index e340cf2..7e1347e 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java @@ -30,7 +30,7 @@ import org.apache.activemq.artemis.cli.commands.tools.EncodeJournal; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.TestableJournal; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -104,6 +104,10 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase protected void resetFileFactory() throws Exception { + if (fileFactory != null) + { + fileFactory.stop(); + } fileFactory = getFileFactory(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java index ab86abc..b009092 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java @@ -25,7 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.journal.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.tests.unit.UnitTestLogger; import org.apache.activemq.artemis.tests.util.RandomUtil; @@ -42,7 +42,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase for (String file : files) { - SequentialFile seqFile = fileFactory.createSequentialFile(file, 1); + SequentialFile seqFile = fileFactory.createSequentialFile(file); Assert.assertEquals(fileSize, seqFile.size()); } @@ -222,7 +222,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase for (String fileStr : files) { - SequentialFile file = fileFactory.createSequentialFile(fileStr, 1); + SequentialFile file = fileFactory.createSequentialFile(fileStr); ByteBuffer buffer = fileFactory.newBuffer(JournalImpl.SIZE_HEADER); @@ -284,7 +284,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase long fileID = Integer.MAX_VALUE; for (String fileStr : files) { - SequentialFile file = fileFactory.createSequentialFile(fileStr, 1); + SequentialFile file = fileFactory.createSequentialFile(fileStr); file.open(); @@ -2138,6 +2138,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase createJournal(); startJournal(); loadAndCheck(); + stopJournal(); } @Test http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java index b5b721e..858227a 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/ReclaimerTest.java @@ -27,7 +27,7 @@ import java.util.Set; import org.junit.Assert; -import org.apache.activemq.artemis.core.journal.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.journal.impl.Reclaimer; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java index 469bdf2..60ca28d 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java @@ -23,9 +23,9 @@ import java.util.UUID; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.After; import org.junit.Assert; @@ -49,14 +49,14 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase @After public void tearDown() throws Exception { - Assert.assertEquals(0, AsynchronousFileImpl.getTotalMaxIO()); - factory.stop(); factory = null; ActiveMQTestBase.forceGC(); + Assert.assertEquals(0, LibaioContext.getTotalMaxIO()); + super.tearDown(); } @@ -86,7 +86,7 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase expectedFiles.add(fileName); - SequentialFile sf = factory.createSequentialFile(fileName, 1); + SequentialFile sf = factory.createSequentialFile(fileName); sf.open(); @@ -98,10 +98,10 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase // Create a couple with a different extension - they shouldn't be picked // up - SequentialFile sf1 = factory.createSequentialFile("different.file", 1); + SequentialFile sf1 = factory.createSequentialFile("different.file"); sf1.open(); - SequentialFile sf2 = factory.createSequentialFile("different.cheese", 1); + SequentialFile sf2 = factory.createSequentialFile("different.cheese"); sf2.open(); List<String> fileNames = factory.listFiles("amq"); @@ -132,22 +132,18 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase @Test public void testFill() throws Exception { - SequentialFile sf = factory.createSequentialFile("fill.amq", 1); + SequentialFile sf = factory.createSequentialFile("fill.amq"); sf.open(); try { - checkFill(sf, 0, 2048, (byte)'X'); - - checkFill(sf, 512, 512, (byte)'Y'); - - checkFill(sf, 0, 1, (byte)'Z'); + checkFill(sf, 2048); - checkFill(sf, 512, 1, (byte)'A'); + checkFill(sf, 512); - checkFill(sf, 1024, 512 * 4, (byte)'B'); + checkFill(sf, 512 * 4); } finally { @@ -158,11 +154,11 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase @Test public void testDelete() throws Exception { - SequentialFile sf = factory.createSequentialFile("delete-me.amq", 1); + SequentialFile sf = factory.createSequentialFile("delete-me.amq"); sf.open(); - SequentialFile sf2 = factory.createSequentialFile("delete-me2.amq", 1); + SequentialFile sf2 = factory.createSequentialFile("delete-me2.amq"); sf2.open(); @@ -189,7 +185,7 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase @Test public void testRename() throws Exception { - SequentialFile sf = factory.createSequentialFile("test1.amq", 1); + SequentialFile sf = factory.createSequentialFile("test1.amq"); sf.open(); @@ -222,7 +218,7 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase @Test public void testWriteandRead() throws Exception { - SequentialFile sf = factory.createSequentialFile("write.amq", 1); + SequentialFile sf = factory.createSequentialFile("write.amq"); sf.open(); @@ -291,14 +287,14 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase @Test public void testPosition() throws Exception { - SequentialFile sf = factory.createSequentialFile("position.amq", 1); + SequentialFile sf = factory.createSequentialFile("position.amq"); sf.open(); try { - sf.fill(0, 3 * 512, (byte)0); + sf.fill(3 * 512); String s1 = "orange"; byte[] bytes1 = s1.getBytes(StandardCharsets.UTF_8); @@ -376,11 +372,11 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase @Test public void testOpenClose() throws Exception { - SequentialFile sf = factory.createSequentialFile("openclose.amq", 1); + SequentialFile sf = factory.createSequentialFile("openclose.amq"); sf.open(); - sf.fill(0, 512, (byte)0); + sf.fill(512); String s1 = "cheesecake"; byte[] bytes1 = s1.getBytes(StandardCharsets.UTF_8); @@ -418,15 +414,15 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase return ActiveMQBuffers.wrappedBuffer(bytes); } - protected void checkFill(final SequentialFile file, final int pos, final int size, final byte fillChar) throws Exception + protected void checkFill(final SequentialFile file, final int size) throws Exception { - file.fill(pos, size, fillChar); + file.fill(size); file.close(); file.open(); - file.position(pos); + file.position(0); ByteBuffer bb = factory.newBuffer(size); @@ -437,7 +433,7 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase for (int i = 0; i < size; i++) { // log.debug(" i is " + i); - Assert.assertEquals(fillChar, bb.get(i)); + Assert.assertEquals(0, bb.get(i)); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java index 76234b8..21866c6 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java @@ -16,11 +16,6 @@ */ package org.apache.activemq.artemis.tests.unit.core.journal.impl; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQBuffers; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.junit.Test; - import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -28,11 +23,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; +import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Assert; - -import org.apache.activemq.artemis.core.journal.IOAsyncTask; -import org.apache.activemq.artemis.core.journal.impl.TimedBuffer; -import org.apache.activemq.artemis.core.journal.impl.TimedBufferObserver; +import org.junit.Test; public class TimedBufferTest extends ActiveMQTestBase { @@ -49,7 +47,7 @@ public class TimedBufferTest extends ActiveMQTestBase // Public -------------------------------------------------------- - IOAsyncTask dummyCallback = new IOAsyncTask() + IOCallback dummyCallback = new IOCallback() { public void done() @@ -69,7 +67,7 @@ public class TimedBufferTest extends ActiveMQTestBase final AtomicInteger flushTimes = new AtomicInteger(0); class TestObserver implements TimedBufferObserver { - public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOAsyncTask> callbacks) + public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) { buffers.add(buffer); flushTimes.incrementAndGet(); @@ -144,7 +142,7 @@ public class TimedBufferTest extends ActiveMQTestBase final AtomicInteger flushTimes = new AtomicInteger(0); class TestObserver implements TimedBufferObserver { - public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOAsyncTask> callbacks) + public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) { buffers.add(buffer); flushTimes.incrementAndGet(); @@ -235,7 +233,7 @@ public class TimedBufferTest extends ActiveMQTestBase { class TestObserver implements TimedBufferObserver { - public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOAsyncTask> callbacks) + public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) { } @@ -321,7 +319,7 @@ public class TimedBufferTest extends ActiveMQTestBase { class TestObserver implements TimedBufferObserver { - public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOAsyncTask> callbacks) + public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java index 59586fd..75a9535 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java @@ -27,12 +27,11 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; -import org.apache.activemq.artemis.core.asyncio.BufferCallback; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.journal.EncodingSupport; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; -import org.apache.activemq.artemis.core.journal.impl.TimedBuffer; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; public class FakeSequentialFileFactory implements SequentialFileFactory { @@ -63,9 +62,16 @@ public class FakeSequentialFileFactory implements SequentialFileFactory this(1, false); } + @Override + public int getMaxIO() + { + return 1; + } + + // Public -------------------------------------------------------- - public SequentialFile createSequentialFile(final String fileName, final int maxAIO) + public SequentialFile createSequentialFile(final String fileName) { FakeSequentialFile sf = fileMap.get(fileName); @@ -233,11 +239,11 @@ public class FakeSequentialFileFactory implements SequentialFileFactory final ByteBuffer bytes; - final IOAsyncTask callback; + final IOCallback callback; volatile boolean sendError; - CallbackRunnable(final FakeSequentialFile file, final ByteBuffer bytes, final IOAsyncTask callback) + CallbackRunnable(final FakeSequentialFile file, final ByteBuffer bytes, final IOCallback callback) { this.file = file; this.bytes = bytes; @@ -260,11 +266,6 @@ public class FakeSequentialFileFactory implements SequentialFileFactory { callback.done(); } - - if (file.bufferCallback != null) - { - file.bufferCallback.bufferDone(bytes); - } } catch (Throwable e) { @@ -288,8 +289,6 @@ public class FakeSequentialFileFactory implements SequentialFileFactory private ByteBuffer data; - private BufferCallback bufferCallback; - public ByteBuffer getData() { return data; @@ -321,14 +320,6 @@ public class FakeSequentialFileFactory implements SequentialFileFactory notifyAll(); } - public synchronized void waitForClose() throws Exception - { - while (open) - { - this.wait(); - } - } - public void delete() { if (open) @@ -355,26 +346,21 @@ public class FakeSequentialFileFactory implements SequentialFileFactory checkAndResize(0); } - public void setBufferCallback(final BufferCallback callback) - { - bufferCallback = callback; - } - - public void fill(final int pos, final int size, final byte fillCharacter) throws Exception + public void fill(final int size) throws Exception { if (!open) { throw new IllegalStateException("Is closed"); } - checkAndResize(pos + size); + checkAndResize(size); // log.debug("size is " + size + " pos is " + pos); - for (int i = pos; i < size + pos; i++) + for (int i = 0; i < size; i++) { byte[] array = data.array(); - array[i] = fillCharacter; + array[i] = 0; // log.debug("Filling " + pos + " with char " + fillCharacter); } @@ -385,7 +371,7 @@ public class FakeSequentialFileFactory implements SequentialFileFactory return read(bytes, null); } - public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception + public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception { if (!open) { @@ -426,7 +412,7 @@ public class FakeSequentialFileFactory implements SequentialFileFactory return data.position(); } - public synchronized void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) + public synchronized void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback) { if (!open) { @@ -485,7 +471,7 @@ public class FakeSequentialFileFactory implements SequentialFileFactory } /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.SequentialFile#writeInternal(java.nio.ByteBuffer) + * @see org.apache.activemq.artemis.core.io.SequentialFile#writeInternal(java.nio.ByteBuffer) */ public void writeInternal(ByteBuffer bytes) throws Exception { @@ -555,7 +541,7 @@ public class FakeSequentialFileFactory implements SequentialFileFactory } /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.SequentialFile#renameTo(org.apache.activemq.artemis.core.journal.SequentialFile) + * @see org.apache.activemq.artemis.core.io.SequentialFile#renameTo(org.apache.activemq.artemis.core.io.SequentialFile) */ public void renameTo(final String newFileName) throws Exception { @@ -565,7 +551,7 @@ public class FakeSequentialFileFactory implements SequentialFileFactory } /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.SequentialFile#fits(int) + * @see org.apache.activemq.artemis.core.io.SequentialFile#fits(int) */ public boolean fits(final int size) { @@ -573,21 +559,21 @@ public class FakeSequentialFileFactory implements SequentialFileFactory } /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.SequentialFile#setBuffering(boolean) + * @see org.apache.activemq.artemis.core.io.SequentialFile#setBuffering(boolean) */ public void setBuffering(final boolean buffering) { } /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.SequentialFile#lockBuffer() + * @see org.apache.activemq.artemis.core.io.SequentialFile#lockBuffer() */ public void disableAutoFlush() { } /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.SequentialFile#unlockBuffer() + * @see org.apache.activemq.artemis.core.io.SequentialFile#unlockBuffer() */ public void enableAutoFlush() { @@ -599,9 +585,9 @@ public class FakeSequentialFileFactory implements SequentialFileFactory } /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.SequentialFile#write(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer, boolean, org.apache.activemq.artemis.core.journal.IOCallback) + * @see org.apache.activemq.artemis.core.io.SequentialFile#write(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer, boolean, org.apache.activemq.artemis.core.journal.IOCallback) */ - public void write(final ActiveMQBuffer bytes, final boolean sync, final IOAsyncTask callback) throws Exception + public void write(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) throws Exception { bytes.writerIndex(bytes.capacity()); bytes.readerIndex(0); @@ -610,7 +596,7 @@ public class FakeSequentialFileFactory implements SequentialFileFactory } /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.SequentialFile#write(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer, boolean) + * @see org.apache.activemq.artemis.core.io.SequentialFile#write(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer, boolean) */ public void write(final ActiveMQBuffer bytes, final boolean sync) throws Exception { @@ -620,9 +606,9 @@ public class FakeSequentialFileFactory implements SequentialFileFactory } /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.SequentialFile#write(org.apache.activemq.artemis.core.journal.EncodingSupport, boolean, org.apache.activemq.artemis.core.journal.IOCompletion) + * @see org.apache.activemq.artemis.core.io.SequentialFile#write(org.apache.activemq.artemis.core.journal.EncodingSupport, boolean, org.apache.activemq.artemis.core.journal.IOCompletion) */ - public void write(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback) throws Exception + public void write(final EncodingSupport bytes, final boolean sync, final IOCallback callback) throws Exception { ByteBuffer buffer = newBuffer(bytes.getEncodeSize()); ActiveMQBuffer outbuffer = ActiveMQBuffers.wrappedBuffer(buffer); @@ -631,7 +617,7 @@ public class FakeSequentialFileFactory implements SequentialFileFactory } /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.SequentialFile#write(org.apache.activemq.artemis.core.journal.EncodingSupport, boolean) + * @see org.apache.activemq.artemis.core.io.SequentialFile#write(org.apache.activemq.artemis.core.journal.EncodingSupport, boolean) */ public void write(final EncodingSupport bytes, final boolean sync) throws Exception { @@ -642,7 +628,7 @@ public class FakeSequentialFileFactory implements SequentialFileFactory } /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.SequentialFile#exists() + * @see org.apache.activemq.artemis.core.io.SequentialFile#exists() */ public boolean exists() { @@ -652,14 +638,14 @@ public class FakeSequentialFileFactory implements SequentialFileFactory } /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.SequentialFile#setTimedBuffer(org.apache.activemq.artemis.core.journal.impl.TimedBuffer) + * @see org.apache.activemq.artemis.core.io.SequentialFile#setTimedBuffer(org.apache.activemq.artemis.core.io.buffer.TimedBuffer) */ public void setTimedBuffer(final TimedBuffer buffer) { } /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.SequentialFile#copyTo(org.apache.activemq.artemis.core.journal.SequentialFile) + * @see org.apache.activemq.artemis.core.io.SequentialFile#copyTo(org.apache.activemq.artemis.core.io.SequentialFile) */ public void copyTo(SequentialFile newFileName) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java index 8c4ff47..5accdac 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java @@ -22,9 +22,9 @@ import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl; @@ -52,14 +52,14 @@ public class PageTest extends ActiveMQTestBase public void testPageWithNIO() throws Exception { recreateDirectory(getTestDir()); - testAdd(new NIOSequentialFileFactory(getTestDirfile()), 1000); + testAdd(new NIOSequentialFileFactory(getTestDirfile(), 1), 1000); } @Test public void testDamagedDataWithNIO() throws Exception { recreateDirectory(getTestDir()); - testDamagedPage(new NIOSequentialFileFactory(getTestDirfile()), 1000); + testDamagedPage(new NIOSequentialFileFactory(getTestDirfile(), 1), 1000); } @Test @@ -83,7 +83,7 @@ public class PageTest extends ActiveMQTestBase protected void testAdd(final SequentialFileFactory factory, final int numberOfElements) throws Exception { - SequentialFile file = factory.createSequentialFile("00010.page", 1); + SequentialFile file = factory.createSequentialFile("00010.page"); Page impl = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10); @@ -100,7 +100,7 @@ public class PageTest extends ActiveMQTestBase impl.sync(); impl.close(); - file = factory.createSequentialFile("00010.page", 1); + file = factory.createSequentialFile("00010.page"); file.open(); impl = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10); @@ -130,7 +130,7 @@ public class PageTest extends ActiveMQTestBase protected void testDamagedPage(final SequentialFileFactory factory, final int numberOfElements) throws Exception { - SequentialFile file = factory.createSequentialFile("00010.page", 1); + SequentialFile file = factory.createSequentialFile("00010.page"); Page impl = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10); @@ -172,7 +172,7 @@ public class PageTest extends ActiveMQTestBase impl.close(); - file = factory.createSequentialFile("00010.page", 1); + file = factory.createSequentialFile("00010.page"); file.open(); impl = new Page(new SimpleString("something"), new NullStorageManager(), factory, file, 10); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java index 1a657c1..3863c12 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java @@ -34,9 +34,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.journal.SequentialFile; -import org.apache.activemq.artemis.core.journal.SequentialFileFactory; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; @@ -129,7 +129,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase public void testPageWithNIO() throws Exception { ActiveMQTestBase.recreateDirectory(getTestDir()); - testConcurrentPaging(new NIOSequentialFileFactory(new File(getTestDir())), 1); + testConcurrentPaging(new NIOSequentialFileFactory(new File(getTestDir()), 1), 1); } @Test @@ -565,7 +565,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase for (String file : files) { - SequentialFile fileTmp = factory.createSequentialFile(file, 1); + SequentialFile fileTmp = factory.createSequentialFile(file); fileTmp.open(); Assert.assertTrue("The page file size (" + fileTmp.size() + ") shouldn't be > " + MAX_SIZE, fileTmp.size() <= MAX_SIZE); @@ -645,7 +645,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase public void testRestartPage() throws Throwable { clearDataRecreateServerDirs(); - SequentialFileFactory factory = new NIOSequentialFileFactory(new File(getPageDir())); + SequentialFileFactory factory = new NIOSequentialFileFactory(new File(getPageDir()), 1); PagingStoreFactory storeFactory = new FakeStoreFactory(factory); @@ -682,7 +682,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase public void testOrderOnPaging() throws Throwable { clearDataRecreateServerDirs(); - SequentialFileFactory factory = new NIOSequentialFileFactory(new File(getPageDir())); + SequentialFileFactory factory = new NIOSequentialFileFactory(new File(getPageDir()), 1); PagingStoreFactory storeFactory = new FakeStoreFactory(factory); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java index 3c1fb88..f89d61c 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java @@ -25,7 +25,7 @@ import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.journal.impl.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; @@ -39,7 +39,7 @@ public class BatchIDGeneratorUnitTest extends ActiveMQTestBase @Test public void testSequence() throws Exception { - NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getTestDir())); + NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getTestDir()), 1); Journal journal = new JournalImpl(10 * 1024, 2, 0, 0, factory, "activemq-bindings", "bindings", 1); journal.start(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/OperationContextUnitTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/OperationContextUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/OperationContextUnitTest.java index 8e14647..f4abbaa 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/OperationContextUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/OperationContextUnitTest.java @@ -23,8 +23,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.apache.activemq.artemis.core.journal.IOAsyncTask; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.junit.Assert; import org.junit.Test; @@ -52,7 +52,7 @@ public class OperationContextUnitTest extends ActiveMQTestBase final CountDownLatch latch1 = new CountDownLatch(1); final CountDownLatch latch2 = new CountDownLatch(1); - impl.executeOnCompletion(new IOAsyncTask() + impl.executeOnCompletion(new IOCallback() { public void onError(int errorCode, String errorMessage) @@ -70,7 +70,7 @@ public class OperationContextUnitTest extends ActiveMQTestBase for (int i = 0; i < 10; i++) impl.storeLineUp(); for (int i = 0; i < 3; i++) impl.pageSyncLineUp(); - impl.executeOnCompletion(new IOAsyncTask() + impl.executeOnCompletion(new IOCallback() { public void onError(int errorCode, String errorMessage) @@ -213,7 +213,7 @@ public class OperationContextUnitTest extends ActiveMQTestBase final AtomicInteger operations = new AtomicInteger(0); // We should be up to date with lineUps and executions. this should now just finish processing - context.executeOnCompletion(new IOAsyncTask() + context.executeOnCompletion(new IOCallback() { public void done() http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/FileLockTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/FileLockTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/FileLockTest.java index 643e6c2..89c017b 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/FileLockTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/FileLockTest.java @@ -17,9 +17,9 @@ package org.apache.activemq.artemis.tests.unit.core.server.impl; import java.io.File; -import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl; import org.apache.activemq.artemis.core.server.impl.AIOFileLockNodeManager; import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; +import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Before; import org.junit.Test; @@ -47,7 +47,7 @@ public class FileLockTest extends ActiveMQTestBase @Test public void testAIOLock() throws Exception { - if (AsynchronousFileImpl.isLoaded()) + if (LibaioContext.isLoaded()) { doTestLock(new AIOFileLockNodeManager(getTestDirfile(), false), new AIOFileLockNodeManager(getTestDirfile(), false)); }
