This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 82645aa ARTEMIS-2293 addPacket in LargeMessageControllerImpl won't
notifyAll for exception
82645aa is described below
commit 82645aa4e9e307b28351a10c0a450a2d387fa17e
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Nov 2 14:52:17 2021 -0500
ARTEMIS-2293 addPacket in LargeMessageControllerImpl won't notifyAll for
exception
---
.../client/impl/LargeMessageControllerImpl.java | 4 ++--
.../core/client/impl/LargeMessageBufferTest.java | 27 ++++++++++++++++++++++
2 files changed, 29 insertions(+), 2 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index 4b9ba0c..a98ca81 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -162,14 +162,14 @@ public class LargeMessageControllerImpl implements
LargeMessageController {
flowControlCredit = flowControlSize;
- notifyAll();
-
if (streamEnded) {
outStream.close();
}
} catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
handledException = e;
+ } finally {
+ notifyAll();
}
} else {
if (fileCache != null) {
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
index 54a9bdc..79fe5f3 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java
@@ -468,6 +468,33 @@ public class LargeMessageBufferTest extends
ActiveMQTestBase {
}
@Test
+ public void testStreamDataWaitCompletionOnException() throws Exception {
+ final LargeMessageControllerImpl outBuffer = new
LargeMessageControllerImpl(new FakeConsumerInternal(), 5, 5000);
+
+ class FakeOutputStream extends OutputStream {
+
+ @Override
+ public void write(int b) throws IOException {
+ throw new IOException();
+ }
+ }
+
+ outBuffer.setOutputStream(new FakeOutputStream());
+
+ CountDownLatch latch = new CountDownLatch(1);
+ new Thread(() -> {
+ try {
+ outBuffer.waitCompletion(0);
+ fail("supposed to throw an exception");
+ } catch (ActiveMQException e) {
+ latch.countDown();
+ }
+ }).start();
+ outBuffer.addPacket(RandomUtil.randomBytes(), 1, true);
+ assertTrue("The IOException should trigger an immediate failure",
latch.await(3, TimeUnit.SECONDS));
+ }
+
+ @Test
public void testStreamDataWaitCompletionOnSlowComingBuffer() throws
Exception {
final LargeMessageControllerImpl outBuffer = new
LargeMessageControllerImpl(new FakeConsumerInternal(), 5, 1000);