This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e2778b26eff9134bcde1fbe2e84aaca86edfceb5 Author: JiangHaiting <[email protected]> AuthorDate: Wed Feb 23 12:03:07 2022 +0800 Fix Field 'consumer_epoch' is not set in ServerCnx (#14410) ### Motivation Test case `SimpleProducerConsumerTest#testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause` fails with debug log enabled. Root cause is that `redeliver.getConsumerEpoch()` used in debug log without check if it's set. ``` 2022-02-22T13:13:09,216+0800 [pulsar-io-6-1] WARN ServerCnx - [/127.0.0.1:64428] Got exception java.lang.IllegalStateException: Field 'consumer_epoch' is not set at org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages.getConsumerEpoch(CommandRedeliverUnacknowledgedMessages.java:87) at org.apache.pulsar.broker.service.ServerCnx.handleRedeliverUnacknowledged(ServerCnx.java:1559) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:274) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:829) ``` ### Modifications Add check before get. ### Verifying this change - [ ] Make sure that the change passes the CI checks. This change is a trivial rework / code cleanup without any test coverage. ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (no) - The public API: (no) - The schema: (no) - The default values of configurations: (no) - The wire protocol: (no) - The rest endpoints: (no) - The admin cli options: (no) - Anything that affects deployment: (no) ### Documentation Check the box below and label this PR (if you have committer privilege). Need to update docs? - [x] `no-need-doc` --- .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4ad42c0..f216902 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1556,7 +1556,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { checkArgument(state == State.Connected); if (log.isDebugEnabled()) { log.debug("[{}] redeliverUnacknowledged from consumer {}, consumerEpoch {}", - remoteAddress, redeliver.getConsumerId(), redeliver.getConsumerEpoch()); + remoteAddress, redeliver.getConsumerId(), + redeliver.hasConsumerEpoch() ? redeliver.getConsumerEpoch() : null); } CompletableFuture<Consumer> consumerFuture = consumers.get(redeliver.getConsumerId());
