ARTEMIS-332 avoid shutting down the server after interrupted threads on paging
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7820fd6d Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7820fd6d Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7820fd6d Branch: refs/heads/master Commit: 7820fd6d609b57678a2b28f75e989350778d0291 Parents: 6b42f26 Author: Clebert Suconic <[email protected]> Authored: Thu Jan 7 16:07:30 2016 -0500 Committer: Clebert Suconic <[email protected]> Committed: Thu Jan 7 16:08:38 2016 -0500 ---------------------------------------------------------------------- .../artemis/core/io/nio/NIOSequentialFile.java | 34 ++++ .../artemis/core/paging/PagingStoreFactory.java | 2 - .../core/paging/impl/PagingStoreFactoryNIO.java | 4 - .../journal/NIOSequentialFileFactoryTest.java | 155 +++++++++++++++++++ .../core/paging/impl/PagingStoreImplTest.java | 4 - 5 files changed, 189 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7820fd6d/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java index 548b9a3..e60c21c 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.concurrent.Executor; import java.util.concurrent.Semaphore; @@ -93,6 +94,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile { fileSize = channel.size(); } + catch (ClosedChannelException e) { + throw e; + } catch (IOException e) { factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); throw e; @@ -117,6 +121,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile { channel.force(false); channel.position(0); } + catch (ClosedChannelException e) { + throw e; + } catch (IOException e) { factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); throw e; @@ -125,6 +132,12 @@ public final class NIOSequentialFile extends AbstractSequentialFile { fileSize = channel.size(); } + public synchronized void waitForClose() throws InterruptedException { + while (isOpen()) { + wait(); + } + } + @Override public synchronized void close() throws IOException, InterruptedException, ActiveMQException { super.close(); @@ -145,6 +158,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile { rfile.close(); } } + catch (ClosedChannelException e) { + throw e; + } catch (IOException e) { factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); throw e; @@ -178,6 +194,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile { return bytesRead; } + catch (ClosedChannelException e) { + throw e; + } catch (IOException e) { if (callback != null) { callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getLocalizedMessage()); @@ -195,6 +214,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile { try { channel.force(false); } + catch (ClosedChannelException e) { + throw e; + } catch (IOException e) { factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); throw e; @@ -211,6 +233,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile { try { return channel.size(); } + catch (ClosedChannelException e) { + throw e; + } catch (IOException e) { factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); throw e; @@ -223,6 +248,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile { super.position(pos); channel.position(pos); } + catch (ClosedChannelException e) { + throw e; + } catch (IOException e) { factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); throw e; @@ -291,6 +319,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile { try { doInternalWrite(bytes, sync, callback); } + catch (ClosedChannelException e) { + throw e; + } catch (IOException e) { factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); } @@ -306,6 +337,9 @@ public final class NIOSequentialFile extends AbstractSequentialFile { try { doInternalWrite(bytes, sync, callback); } + catch (ClosedChannelException e) { + ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e); + } catch (IOException e) { ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e); factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7820fd6d/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java index 91907ba..8c2d11a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java @@ -38,6 +38,4 @@ public interface PagingStoreFactory { SequentialFileFactory newFileFactory(SimpleString address) throws Exception; - void criticalException(Throwable e); - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7820fd6d/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java index 39cd956..0b0d210 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java @@ -87,10 +87,6 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory { // Public -------------------------------------------------------- - public void criticalException(Throwable e) { - critialErrorListener.onIOException(e, e.getMessage(), null); - } - @Override public void stop() { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7820fd6d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOSequentialFileFactoryTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOSequentialFileFactoryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOSequentialFileFactoryTest.java index dc72104..abadceb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOSequentialFileFactoryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOSequentialFileFactoryTest.java @@ -17,10 +17,18 @@ package org.apache.activemq.artemis.tests.integration.journal; import java.io.File; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.tests.unit.core.journal.impl.SequentialFileFactoryTestBase; +import org.junit.Assert; +import org.junit.Test; public class NIOSequentialFileFactoryTest extends SequentialFileFactoryTestBase { @@ -29,4 +37,151 @@ public class NIOSequentialFileFactoryTest extends SequentialFileFactoryTestBase return new NIOSequentialFileFactory(new File(folder), true, 1); } + @Test + public void testInterrupts() throws Throwable { + + final EncodingSupport fakeEncoding = new EncodingSupport() { + @Override + public int getEncodeSize() { + return 10; + } + + @Override + public void encode(ActiveMQBuffer buffer) { + buffer.writeBytes(new byte[10]); + } + + @Override + public void decode(ActiveMQBuffer buffer) { + + } + }; + + final AtomicInteger calls = new AtomicInteger(0); + final NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getTestDir()), new IOCriticalErrorListener() { + @Override + public void onIOException(Throwable code, String message, SequentialFile file) { + new Exception("shutdown").printStackTrace(); + calls.incrementAndGet(); + } + }, 1); + + Thread threadOpen = new Thread() { + public void run() { + try { + Thread.currentThread().interrupt(); + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadOpen.start(); + threadOpen.join(); + + Thread threadClose = new Thread() { + public void run() { + try { + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + file.write(fakeEncoding, true); + Thread.currentThread().interrupt(); + file.close(); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadClose.start(); + threadClose.join(); + + Thread threadWrite = new Thread() { + public void run() { + try { + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + Thread.currentThread().interrupt(); + file.write(fakeEncoding, true); + file.close(); + + } + catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadWrite.start(); + threadWrite.join(); + + Thread threadFill = new Thread() { + public void run() { + try { + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + Thread.currentThread().interrupt(); + file.fill(1024); + file.close(); + + } + catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadFill.start(); + threadFill.join(); + + Thread threadWriteDirect = new Thread() { + public void run() { + try { + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + ByteBuffer buffer = ByteBuffer.allocate(10); + buffer.put(new byte[10]); + Thread.currentThread().interrupt(); + file.writeDirect(buffer, true); + file.close(); + + } + catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadWriteDirect.start(); + threadWriteDirect.join(); + + Thread threadRead = new Thread() { + public void run() { + try { + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + file.write(fakeEncoding, true); + file.position(0); + ByteBuffer readBytes = ByteBuffer.allocate(fakeEncoding.getEncodeSize()); + Thread.currentThread().interrupt(); + file.read(readBytes); + file.close(); + + } + catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadRead.start(); + threadRead.join(); + + // An interrupt exception shouldn't issue a shutdown + Assert.assertEquals(0, calls.get()); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7820fd6d/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java index 5f02cf9..59d2646 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java @@ -782,10 +782,6 @@ public class PagingStoreImplTest extends ActiveMQTestBase { static final class FakeStoreFactory implements PagingStoreFactory { - @Override - public void criticalException(Throwable e) { - } - final SequentialFileFactory factory; public FakeStoreFactory() {
