kjothen opened a new issue, #18349: URL: https://github.com/apache/pulsar/issues/18349
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version OS = macOS Ventura (13.0) Pulsar = 2.10.2 ### Minimal reproduce step Attempt to consume an encrypted message in a Consumer using either a mismatching or missing encryption key, and configuring cryptoFailureAction = CONSUME, results in an internal exception+disconnect+retry infinite loop and the receive() call never returns. ### What did you expect to see? Expected to see the byte buffer of the encrypted message returned from receive(), as per the documentation. [Decrypting encrypted messages at the consumer application](https://pulsar.apache.org/docs/cookbooks-encryption/#decrypting-encrypted-messages-at-the-consumer-application) ### What did you see instead? Two types of exception: 1. If the consumer's implementation of the CryptoKeyReader:getPrivateKey returns null then the consumer throws this type of exception and gets caught in an infinite retry loop after auto-reconnecting: ``` 20:50:50.606 [pulsar-client-io-79-1] ERROR o.a.p.c.impl.crypto.MessageCryptoBc - [persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-2-subscriber] Failed to decrypt data key tenant-key-1 to decrypt messages Cannot invoke "org.apache.pulsar.client.api.EncryptionKeyInfo.getKey()" because "keyInfo" is null 20:50:50.606 [pulsar-client-io-79-1] WARN o.a.pulsar.client.impl.ConsumerImpl - [persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-2-subscriber][d7a64][org.apache.pulsar.common.api.proto.MessageIdData@6596540d] Decryption failed. Consuming encrypted message since config is set to consume. 20:50:50.607 [pulsar-client-io-79-1] WARN o.a.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:50561] Got exception java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6 at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) at org.apache.pulsar.common.api.proto.SingleMessageMetadata.parseFrom(SingleMessageMetadata.java:470) at org.apache.pulsar.common.protocol.Commands.deSerializeSingleMessageInBatch(Commands.java:1719) at org.apache.pulsar.client.impl.ConsumerImpl.newSingleMessage(ConsumerImpl.java:1127) at org.apache.pulsar.client.impl.ConsumerImpl.receiveIndividualMessagesFromBatch(ConsumerImpl.java:1520) at org.apache.pulsar.client.impl.ConsumerImpl.messageReceived(ConsumerImpl.java:1369) at org.apache.pulsar.client.impl.ClientCnx.handleMessage(ClientCnx.java:459) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:187) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:1589) 20:50:50.710 [pulsar-client-io-79-1] INFO o.a.pulsar.client.impl.ClientCnx - [id: 0x8de9df33, L:/127.0.0.1:50569 ! R:localhost/127.0.0.1:50561] Disconnected 20:50:50.710 [pulsar-client-io-79-1] INFO o.a.p.client.impl.ConnectionHandler - [persistent://tenant-1/namespace-1/topic-1-partition-0] [standalone-0-0] Closed connection [id: 0x8de9df33, L:/127.0.0.1:50569 ! R:localhost/127.0.0.1:50561] -- Will try again in 0.1 s 20:50:50.711 [pulsar-client-io-79-1] INFO o.a.p.client.impl.ConnectionHandler - [persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-2-subscriber] Closed connection [id: 0x8de9df33, L:/127.0.0.1:50569 ! R:localhost/127.0.0.1:50561] -- Will try again in 0.1 s 20:50:50.711 [pulsar-client-io-79-1] INFO o.a.p.client.impl.ConnectionHandler - [persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-1-subscriber] Closed connection [id: 0x8de9df33, L:/127.0.0.1:50569 ! R:localhost/127.0.0.1:50561] -- Will try again in 0.1 s 20:50:50.813 [pulsar-timer-83-1] INFO o.a.p.client.impl.ConnectionHandler - [persistent://tenant-1/namespace-1/topic-1-partition-0] [standalone-0-0] Reconnecting after timeout 20:50:50.813 [pulsar-timer-83-1] INFO o.a.p.client.impl.ConnectionHandler - [persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-2-subscriber] Reconnecting after timeout 20:50:50.816 [pulsar-client-io-79-1] INFO o.a.p.client.impl.ConnectionPool - [[id: 0x549c5e03, L:/127.0.0.1:50570 - R:localhost/127.0.0.1:50561]] Connected to server 20:50:50.816 [pulsar-client-io-79-1] INFO o.a.pulsar.client.impl.ClientCnx - [id: 0x549c5e03, L:/127.0.0.1:50570 - R:localhost/127.0.0.1:50561] Connected through proxy to target broker at localhost:6650 20:50:50.819 [pulsar-client-io-79-1] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-2-subscriber] Subscribing to topic on cnx [id: 0x549c5e03, L:/127.0.0.1:50570 - R:localhost/127.0.0.1:50561], consumerId 0 20:50:50.819 [pulsar-client-io-79-1] INFO o.a.pulsar.client.impl.ProducerImpl - [persistent://tenant-1/namespace-1/topic-1-partition-0] [standalone-0-0] Creating producer on cnx [id: 0x549c5e03, L:/127.0.0.1:50570 - R:localhost/127.0.0.1:50561] 20:50:50.832 [pulsar-timer-83-1] INFO o.a.p.client.impl.ConnectionHandler - [persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-1-subscriber] Reconnecting after timeout 20:50:50.832 [pulsar-client-io-79-1] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-2-subscriber] Subscribed to topic on localhost/127.0.0.1:50561 -- consumer: 0 20:50:50.834 [pulsar-client-io-79-1] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-1-subscriber] Subscribing to topic on cnx [id: 0x549c5e03, L:/127.0.0.1:50570 - R:localhost/127.0.0.1:50561], consumerId 1 20:50:50.839 [pulsar-client-io-79-1] ERROR o.a.p.c.impl.crypto.MessageCryptoBc - [persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-2-subscriber] Failed to decrypt data key tenant-key-1 to decrypt messages Cannot invoke "org.apache.pulsar.client.api.EncryptionKeyInfo.getKey()" because "keyInfo" is null 20:50:50.852 [pulsar-client-io-79-1] WARN o.a.pulsar.client.impl.ConsumerImpl - [persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-2-subscriber][d7a64][org.apache.pulsar.common.api.proto.MessageIdData@1b2135c9] Decryption failed. Consuming encrypted message since config is set to consume. 20:50:50.853 [pulsar-client-io-79-1] WARN o.a.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:50561] Got exception java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6 at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) at org.apache.pulsar.common.api.proto.SingleMessageMetadata.parseFrom(SingleMessageMetadata.java:470) at org.apache.pulsar.common.protocol.Commands.deSerializeSingleMessageInBatch(Commands.java:1719) at org.apache.pulsar.client.impl.ConsumerImpl.newSingleMessage(ConsumerImpl.java:1127) at org.apache.pulsar.client.impl.ConsumerImpl.receiveIndividualMessagesFromBatch(ConsumerImpl.java:1520) at org.apache.pulsar.client.impl.ConsumerImpl.messageReceived(ConsumerImpl.java:1369) at org.apache.pulsar.client.impl.ClientCnx.handleMessage(ClientCnx.java:459) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:187) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ... ``` 2. If the consumer's implementation of the CryptoKeyReader:getPrivateKey returns a mismatched key (any other key) this type of exception and gets caught in an infinite retry loop after auto-reconnecting: ``` 20:42:38.220 [pulsar-client-io-35-1] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-2-subscriber] Subscribed to topic on localhost/127.0.0.1:50512 -- consumer: 0 2022-11-04T20:42:38.224Z Kierans-iMac.local INFO [com.repldriven.mono.pulsar.crypto:?] - Trying to read private key: find tenant-key-1 in ("tenant-key-1") 20:42:38.227 [pulsar-client-io-35-1] ERROR o.a.p.c.impl.crypto.MessageCryptoBc - [persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-2-subscriber] Failed to decrypt data key tenant-key-1 to decrypt messages unable to process block 20:42:38.227 [pulsar-client-io-35-1] WARN o.a.pulsar.client.impl.ConsumerImpl - [persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-2-subscriber][c585c][org.apache.pulsar.common.api.proto.MessageIdData@31a4eea0] Decryption failed. Consuming encrypted message since config is set to consume. 20:42:38.227 [pulsar-client-io-35-1] WARN o.a.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:50512] Got exception java.lang.IllegalArgumentException: Invalid unknonwn tag type: 4 at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) at org.apache.pulsar.common.api.proto.SingleMessageMetadata.parseFrom(SingleMessageMetadata.java:470) at org.apache.pulsar.common.protocol.Commands.deSerializeSingleMessageInBatch(Commands.java:1719) at org.apache.pulsar.client.impl.ConsumerImpl.newSingleMessage(ConsumerImpl.java:1127) at org.apache.pulsar.client.impl.ConsumerImpl.receiveIndividualMessagesFromBatch(ConsumerImpl.java:1520) at org.apache.pulsar.client.impl.ConsumerImpl.messageReceived(ConsumerImpl.java:1369) at org.apache.pulsar.client.impl.ClientCnx.handleMessage(ClientCnx.java:459) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:187) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:1589) 20:42:38.276 [pulsar-client-io-35-1] INFO o.a.pulsar.client.impl.ClientCnx - [id: 0xe7e22a30, L:/127.0.0.1:50521 ! R:localhost/127.0.0.1:50512] Disconnected 20:42:38.291 [pulsar-timer-39-1] INFO o.a.p.client.impl.ConnectionHandler - [persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-1-subscriber] Reconnecting after connection was closed 20:42:38.291 [pulsar-client-io-35-1] INFO o.a.p.client.impl.ConnectionHandler - [persistent://tenant-1/namespace-1/topic-1-partition-0] [standalone-0-0] Closed connection [id: 0xe7e22a30, L:/127.0.0.1:50521 ! R:localhost/127.0.0.1:50512] -- Will try again in 0.1 s 20:42:38.291 [pulsar-client-io-35-1] INFO o.a.p.client.impl.ConnectionHandler - [persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-2-subscriber] Closed connection [id: 0xe7e22a30, L:/127.0.0.1:50521 ! R:localhost/127.0.0.1:50512] -- Will try again in 0.1 s 20:42:38.295 [pulsar-client-io-35-1] INFO o.a.p.client.impl.ConnectionPool - [[id: 0x3a71b0d2, L:/127.0.0.1:50522 - R:localhost/127.0.0.1:50512]] Connected to server 20:42:38.295 [pulsar-client-io-35-1] INFO o.a.pulsar.client.impl.ClientCnx - [id: 0x3a71b0d2, L:/127.0.0.1:50522 - R:localhost/127.0.0.1:50512] Connected through proxy to target broker at localhost:6650 20:42:38.309 [pulsar-client-io-35-1] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-1-subscriber] Subscribing to topic on cnx [id: 0x3a71b0d2, L:/127.0.0.1:50522 - R:localhost/127.0.0.1:50512], consumerId 1 20:42:38.318 [pulsar-client-io-35-1] INFO o.a.pulsar.client.impl.ConsumerImpl - [persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-1-subscriber] Subscribed to topic on localhost/127.0.0.1:50512 -- consumer: 1 ... ``` ### Anything else? There is no mention of any issue with cryptoFailureAction=CONSUME anywhere in the issues at all. The tests in [SimpleProducerConsumerTest.java](https://github.com/apache/pulsar/blob/master/tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java) do not cover the above use case: they assume in the CONSUME failure action case that the consumer hasn't supplied a `cryptoKeyReader` implementation at all. The above bug describes the more likely scenario that the consumer has a `cryptoKeyReader` but doesn't have the right key. ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
