Jason918 opened a new issue #13688:
URL: https://github.com/apache/pulsar/issues/13688
**Describe the bug**
Send chunking message failed with
`org.apache.pulsar.client.api.PulsarClientException$TimeoutException` when
encryption is enabled.
**To Reproduce**
run the unit test in
`org.apache.pulsar.client.api.SimpleProducerConsumerTest` at master branch.
```
@Test
public void testCryptoWithChunking() throws Exception {
final String topic =
"persistent://my-property/my-ns/testCryptoWithChunking" +
System.currentTimeMillis();
final String ecdsaPublicKeyFile =
"file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
final String ecdsaPrivateKeyFile =
"file:./src/test/resources/certificate/private-key.client-ecdsa.pem";
this.conf.setMaxMessageSize(1000);
@Cleanup
PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
@Cleanup
Consumer<byte[]> consumer1 =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.defaultCryptoKeyReader(ecdsaPrivateKeyFile).subscribe();
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic)
.enableChunking(true)
.enableBatching(false)
.addEncryptionKey("client-ecdsa.pem")
.defaultCryptoKeyReader(ecdsaPublicKeyFile)
.create();
byte[] data = RandomUtils.nextBytes(5100);
MessageId id = producer1.send(data);
log.info("Message Id={}", id);
MessageImpl<byte[]> message;
message = (MessageImpl<byte[]>) consumer1.receive();
Assert.assertEquals(message.getData(), data);
Assert.assertEquals(message.getEncryptionCtx().get().getKeys().size(), 1);
}
```
**Expected behavior**
send success.
**Screenshots**
NA
**Desktop (please complete the following information):**
- OS: NA
**Additional context**
ERROR log in broker.
>
> ... [pulsar-io-6-3] WARN ServerCnx - [/127.0.0.1:62556] Got exception
java.lang.IndexOutOfBoundsException: readerIndex(156) + length(8259) exceeds
writerIndex(1316): PooledSlicedByteBuf(ridx: 156, widx: 1316, cap: 1316/1316,
unwrapped: PooledUnsafeDirectByteBuf(ridx: 2533, widx: 4096, cap: 4096))
> at
io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> at
io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> at io.netty.buffer.AbstractByteBuf.skipBytes(AbstractByteBuf.java:971)
> at
org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:264)
> at
org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370)
> at
org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:437)
> at
org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:424)
> at
org.apache.pulsar.broker.service.ServerCnx.printSendCommandDebug(ServerCnx.java:1420)
> at
org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1379)
> at
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:207)
> at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at
io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
> at
io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
> at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
> at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
> at
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
> at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
> at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
> at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
> at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
> at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
> at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
> at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
> at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> at java.base/java.lang.Thread.run(Thread.java:829)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]