This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 278531f2f923c1c0c31364c804e6650a0628dafa Author: Lari Hotari <[email protected]> AuthorDate: Fri Apr 29 17:07:11 2022 +0300 [Broker/Client] Close connection if a ping or pong message cannot be sent (#15382) * [Broker/Client] Close connection if a ping message cannot be sent - the connection should be closed if a ping message cannot be sent * Handle write errors for pong messages (cherry picked from commit 2ddef95f31ce37486f3f76b4d59730361a77bf6e) --- .../apache/pulsar/common/protocol/PulsarHandler.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java index 48e7ffa7b88..6a0b7bb1fa5 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java @@ -83,7 +83,14 @@ public abstract class PulsarHandler extends PulsarDecoder { if (log.isDebugEnabled()) { log.debug("[{}] Replying back to ping message", ctx.channel()); } - ctx.writeAndFlush(Commands.newPong()); + ctx.writeAndFlush(Commands.newPong()) + .addListener(future -> { + if (!future.isSuccess()) { + log.warn("[{}] Forcing connection to close since cannot send a pong message.", + ctx.channel(), future.cause()); + ctx.close(); + } + }); } @Override @@ -110,7 +117,14 @@ public abstract class PulsarHandler extends PulsarDecoder { log.debug("[{}] Sending ping message", ctx.channel()); } waitingForPingResponse = true; - ctx.writeAndFlush(Commands.newPing()); + ctx.writeAndFlush(Commands.newPing()) + .addListener(future -> { + if (!future.isSuccess()) { + log.warn("[{}] Forcing connection to close since cannot send a ping message.", + ctx.channel(), future.cause()); + ctx.close(); + } + }); } else { if (log.isDebugEnabled()) { log.debug("[{}] Peer doesn't support keep-alive", ctx.channel());
