This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch 2.19.x in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 31560a729b66b7cec8acfd9ca5ced8699f6e5345 Author: Justin Bertram <[email protected]> AuthorDate: Tue Nov 2 14:52:17 2021 -0500 ARTEMIS-2293 addPacket in LargeMessageControllerImpl won't notifyAll for exception (cherry picked from commit 82645aa4e9e307b28351a10c0a450a2d387fa17e) --- .../client/impl/LargeMessageControllerImpl.java | 4 ++-- .../core/client/impl/LargeMessageBufferTest.java | 27 ++++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java index be2bb78..1cce479 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java @@ -169,14 +169,14 @@ public class LargeMessageControllerImpl implements LargeMessageController { flowControlCredit = flowControlSize; - notifyAll(); - if (streamEnded) { outStream.close(); } } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorAddingPacket(e); handledException = e; + } finally { + notifyAll(); } } else { if (fileCache != null) { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java index a9a39c1..06f5298 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java @@ -476,6 +476,33 @@ public class LargeMessageBufferTest extends ActiveMQTestBase { } @Test + public void testStreamDataWaitCompletionOnException() throws Exception { + final LargeMessageControllerImpl outBuffer = new LargeMessageControllerImpl(new FakeConsumerInternal(), 5, 5000); + + class FakeOutputStream extends OutputStream { + + @Override + public void write(int b) throws IOException { + throw new IOException(); + } + } + + outBuffer.setOutputStream(new FakeOutputStream()); + + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> { + try { + outBuffer.waitCompletion(0); + fail("supposed to throw an exception"); + } catch (ActiveMQException e) { + latch.countDown(); + } + }).start(); + outBuffer.addPacket(RandomUtil.randomBytes(), 1, true); + assertTrue("The IOException should trigger an immediate failure", latch.await(3, TimeUnit.SECONDS)); + } + + @Test public void testStreamDataWaitCompletionOnSlowComingBuffer() throws Exception { final LargeMessageControllerImpl outBuffer = new LargeMessageControllerImpl(new FakeConsumerInternal(), 5, 1000);
