[
https://issues.apache.org/jira/browse/ARTEMIS-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17013402#comment-17013402
]
Riyafa Abdul Hameed commented on ARTEMIS-2325:
----------------------------------------------
The handler is the lamda expression in the following line in the code snippet:
producer.send(message, message1 ->
System.out.println(message1.getBodyBuffer().readString()));
> SendAcknowledgementHandler when multiple mesages are sent
> ---------------------------------------------------------
>
> Key: ARTEMIS-2325
> URL: https://issues.apache.org/jira/browse/ARTEMIS-2325
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Environment: Using maven artifact version
> {color:#6a8759}artemis-core-client 2.7.0
> {color}
> Reporter: Riyafa Abdul Hameed
> Priority: Major
>
> When I try to send multiple message while using a
> SendAcknowledgementHandler the following code fails:
> {code:java}
> import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
> import org.apache.activemq.artemis.api.core.client.ClientMessage;
> import org.apache.activemq.artemis.api.core.client.ClientProducer;
> import org.apache.activemq.artemis.api.core.client.ClientSession;
> import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
> import org.apache.activemq.artemis.api.core.client.ServerLocator;
> public class ProducerInvalid {
> public static void main(String[] args) throws Exception {
> ServerLocator locator =
> ActiveMQClient.createServerLocator("tcp://localhost:61616");
> locator.setConfirmationWindowSize(10240);
> ClientSessionFactory factory = locator.createSessionFactory();
> ClientSession session = factory.createSession();
> // A producer is associated with an address ...
> ClientProducer producer = session.createProducer("example");
> for (int i = 0; i < 1000000; i++) {
> ClientMessage message = session.createMessage(true);
> message.getBodyBuffer().writeString("Hello " + i);
> producer.send(message, message1 ->
> System.out.println(message1.getBodyBuffer().readString()));
> }
> }
> }{code}
> The exception thrown is as follows:
> Apr 29, 2019 11:08:44 AM
> {code:java}
> org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler
> bufferReceived
> ERROR: AMQ214031: Failed to decode buffer, disconnect immediately.
> java.lang.IllegalStateException: java.lang.IndexOutOfBoundsException:
> readerIndex(22) + length(4) exceeds writerIndex(22):
> UnpooledDuplicatedByteBuf(ridx: 22, widx: 22, cap: 1500, unwrapped:
> UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx:
> 70, cap: 1500))
> at
> org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:381)
> at
> org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler.bufferReceived(ClientSessionFactoryImpl.java:1191)
> at
> org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler.channelRead(ActiveMQChannelHandler.java:73)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
> at
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
> at
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
> at
> io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:426)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
> at
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
> at
> io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:796)
> at
> io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
> at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
> at
> org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> Caused by: java.lang.IndexOutOfBoundsException: readerIndex(22) + length(4)
> exceeds writerIndex(22): UnpooledDuplicatedByteBuf(ridx: 22, widx: 22, cap:
> 1500, unwrapped:
> UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx:
> 70, cap: 1500))
> at
> io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1428)
> at io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:802)
> at io.netty.buffer.WrappedByteBuf.readInt(WrappedByteBuf.java:571)
> at
> org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readStringInternal(ChannelBufferWrapper.java:92)
> at
> org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readString(ChannelBufferWrapper.java:88)
> at
> org.riyafa.mytests.ProducerInvalid.lambda$main$0(ProducerInvalid.java:23)
> at
> org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$2.callSendAck(ActiveMQSessionContext.java:232)
> at
> org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$2.handleResponse(ActiveMQSessionContext.java:220)
> at
> org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$1.commandConfirmed(ActiveMQSessionContext.java:203)
> at
> org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.clearUpTo(ChannelImpl.java:755)
> at
> org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.handlePacket(ChannelImpl.java:693)
> at
> org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.doBufferReceived(RemotingConnectionImpl.java:399)
> at
> org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:376)
> ... 21 more{code}
> I am not sure if SendAcknowledgementHandler is not supposed to be used in the
> above manner.
> The following works fine:
> {code:java}
> import org.apache.activemq.artemis.api.core.client.*;
> public class Producer {
> public static void main(String[] args) throws Exception {
> ServerLocator locator =
> ActiveMQClient.createServerLocator("tcp://localhost:61616");
> locator.setConfirmationWindowSize(10240);
> ClientSessionFactory factory = locator.createSessionFactory();
> ClientSession session = factory.createSession();
> session.setSendAcknowledgementHandler(message1 ->
> System.out.println(message1.getBodyBuffer().readString()));
> // A producer is associated with an address ...
> ClientProducer producer = session.createProducer("example");
> for (int i = 0; i < 1000000; i++) {
> ClientMessage message = session.createMessage(true);
> message.getBodyBuffer().writeString("Hello " + i);
> producer.send(message);
> }
> }
> }{code}
> But I would like to have an acknowledgment handler per send. Is it not
> possible?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)