Jason918 opened a new issue #13688:
URL: https://github.com/apache/pulsar/issues/13688


   **Describe the bug**
   
   Send chunking message failed with 
`org.apache.pulsar.client.api.PulsarClientException$TimeoutException` when 
encryption is enabled.
   
   **To Reproduce**
   run the unit test in 
`org.apache.pulsar.client.api.SimpleProducerConsumerTest` at master branch.
   ```
   @Test
       public void testCryptoWithChunking() throws Exception {
           final String topic = 
"persistent://my-property/my-ns/testCryptoWithChunking" + 
System.currentTimeMillis();
           final String ecdsaPublicKeyFile = 
"file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
           final String ecdsaPrivateKeyFile = 
"file:./src/test/resources/certificate/private-key.client-ecdsa.pem";
   
           this.conf.setMaxMessageSize(1000);
   
           @Cleanup
           PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
   
           @Cleanup
           Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
                   .defaultCryptoKeyReader(ecdsaPrivateKeyFile).subscribe();
           @Cleanup
           Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic)
                   .enableChunking(true)
                   .enableBatching(false)
                   .addEncryptionKey("client-ecdsa.pem")
                   .defaultCryptoKeyReader(ecdsaPublicKeyFile)
                   .create();
   
           byte[] data = RandomUtils.nextBytes(5100);
           MessageId id = producer1.send(data);
           log.info("Message Id={}", id);
   
           MessageImpl<byte[]> message;
           message = (MessageImpl<byte[]>) consumer1.receive();
           Assert.assertEquals(message.getData(), data);
           
Assert.assertEquals(message.getEncryptionCtx().get().getKeys().size(), 1);
       }
   ```
   
   **Expected behavior**
   send success.
   
   **Screenshots**
   NA
   
   **Desktop (please complete the following information):**
    - OS: NA
   
   **Additional context**
   
   ERROR log in broker.
   
   > 
   > ... [pulsar-io-6-3] WARN  ServerCnx - [/127.0.0.1:62556] Got exception 
java.lang.IndexOutOfBoundsException: readerIndex(156) + length(8259) exceeds 
writerIndex(1316): PooledSlicedByteBuf(ridx: 156, widx: 1316, cap: 1316/1316, 
unwrapped: PooledUnsafeDirectByteBuf(ridx: 2533, widx: 4096, cap: 4096))
   >    at 
io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
   >    at 
io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
   >    at io.netty.buffer.AbstractByteBuf.skipBytes(AbstractByteBuf.java:971)
   >    at 
org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:264)
   >    at 
org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370)
   >    at 
org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:437)
   >    at 
org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:424)
   >    at 
org.apache.pulsar.broker.service.ServerCnx.printSendCommandDebug(ServerCnx.java:1420)
   >    at 
org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1379)
   >    at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:207)
   >    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   >    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   >    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   >    at 
io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
   >    at 
io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
   >    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   >    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   >    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   >    at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
   >    at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
   >    at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
   >    at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
   >    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   >    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   >    at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   >    at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
   >    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   >    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   >    at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
   >    at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
   >    at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
   >    at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
   >    at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
   >    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
   >    at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
   >    at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
   >    at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
   >    at java.base/java.lang.Thread.run(Thread.java:829)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to