This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d6fcdb8e1e1 [improve][broker,proxy] Use ChannelVoidPromise to avoid 
useless promise objects creation (#19141)
d6fcdb8e1e1 is described below

commit d6fcdb8e1e192e0d9e2fbe6307d273b27ed2930f
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Jan 10 16:23:24 2023 +0100

    [improve][broker,proxy] Use ChannelVoidPromise to avoid useless promise 
objects creation (#19141)
---
 .../broker/service/PulsarCommandSenderImpl.java    |  58 ++++-----
 .../apache/pulsar/broker/service/ServerCnx.java    | 130 +++++++++++----------
 .../pulsar/common/protocol/PulsarDecoder.java      |  30 +++--
 .../pulsar/common/util/netty/NettyChannelUtil.java |  63 ++++++++++
 .../common/util/netty/NettyChannelUtilTest.java    |  68 +++++++++++
 .../pulsar/proxy/server/DirectProxyHandler.java    |  30 ++---
 .../pulsar/proxy/server/LookupProxyHandler.java    |  47 ++++----
 .../pulsar/proxy/server/ProxyConnection.java       |  54 +++++----
 8 files changed, 324 insertions(+), 156 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index e7cc25b2c3b..6510da1fbe7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.util.netty.NettyChannelUtil;
 
 @Slf4j
 public class PulsarCommandSenderImpl implements PulsarCommandSender {
@@ -55,7 +56,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         BaseCommand command = 
Commands.newPartitionMetadataResponseCommand(error, errorMsg, requestId);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -63,7 +64,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         BaseCommand command = 
Commands.newPartitionMetadataResponseCommand(partitions, requestId);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -71,7 +72,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         BaseCommand command = Commands.newSuccessCommand(requestId);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -79,7 +80,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         BaseCommand command = Commands.newErrorCommand(requestId, error, 
message);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -87,7 +88,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         BaseCommand command = Commands.newProducerSuccessCommand(requestId, 
producerName, schemaVersion);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -98,7 +99,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
                 schemaVersion, topicEpoch, isProducerReady);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -108,7 +109,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
                 entryId);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -116,7 +117,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         BaseCommand command = Commands.newSendErrorCommand(producerId, 
sequenceId, error, errorMsg);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -126,7 +127,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
                 filtered, changed, requestId);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -134,7 +135,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         BaseCommand command = Commands.newGetSchemaResponseCommand(requestId, 
schema, version);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -142,7 +143,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         BaseCommand command = 
Commands.newGetSchemaResponseErrorCommand(requestId, error, errorMessage);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -150,7 +151,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         BaseCommand command = 
Commands.newGetOrCreateSchemaResponseCommand(requestId, schemaVersion);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -159,7 +160,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
                 Commands.newGetOrCreateSchemaResponseErrorCommand(requestId, 
error, errorMessage);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -168,7 +169,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
                 clientProtocolVersion, maxMessageSize, supportsTopicWatchers);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -179,7 +180,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
                 authoritative, response, requestId, proxyThroughServiceUrl);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -187,7 +188,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         BaseCommand command = Commands.newLookupErrorResponseCommand(error, 
errorMsg, requestId);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -196,9 +197,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
             // if the client is older than `v12`, we don't need to send 
consumer group changes.
             return;
         }
-        cnx.ctx().writeAndFlush(
-                Commands.newActiveConsumerChange(consumerId, isActive),
-                cnx.ctx().voidPromise());
+        writeAndFlush(Commands.newActiveConsumerChange(consumerId, isActive));
     }
 
     @Override
@@ -206,7 +205,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         // Only send notification if the client understand the command
         if (cnx.getRemoteEndpointProtocolVersion() >= 
ProtocolVersion.v9.getValue()) {
             log.info("[{}] Notifying consumer that end of topic has been 
reached", this);
-            cnx.ctx().writeAndFlush(Commands.newReachedEndOfTopic(consumerId), 
cnx.ctx().voidPromise());
+            writeAndFlush(Commands.newReachedEndOfTopic(consumerId));
         }
     }
 
@@ -215,8 +214,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         // Only send notification if the client understand the command
         if (cnx.getRemoteEndpointProtocolVersion() >= 
ProtocolVersion.v20.getValue()) {
             log.info("[{}] Notifying {} that topic is migrated", type.name(), 
resourceId);
-            cnx.ctx().writeAndFlush(Commands.newTopicMigrated(type, 
resourceId, brokerUrl, brokerUrlTls),
-                    cnx.ctx().voidPromise());
+            writeAndFlush(Commands.newTopicMigrated(type, resourceId, 
brokerUrl, brokerUrlTls));
             return true;
         }
         return false;
@@ -310,7 +308,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         BaseCommand command = Commands.newTcClientConnectResponse(requestId, 
error, message);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -324,7 +322,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
                 txnID.getMostSigBits());
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
         if (this.interceptor != null) {
             this.interceptor.txnOpened(tcID, txnID.toString());
         }
@@ -335,7 +333,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
         BaseCommand command = Commands.newTxnResponse(requestId, tcID, error, 
message);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
     }
 
     @Override
@@ -344,7 +342,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
                 txnID.getMostSigBits());
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
         if (this.interceptor != null) {
             this.interceptor.txnEnded(txnID.toString(), txnAction);
         }
@@ -356,7 +354,7 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
                 txnID.getMostSigBits(), error, message);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+        writeAndFlush(outBuf);
         if (this.interceptor != null) {
             this.interceptor.txnEnded(txnID.toString(), TxnAction.ABORT_VALUE);
         }
@@ -378,7 +376,11 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
     private void interceptAndWriteCommand(BaseCommand command) {
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
-        cnx.ctx().writeAndFlush(outBuf);
+        writeAndFlush(outBuf);
+    }
+
+    private void writeAndFlush(ByteBuf outBuf) {
+        NettyChannelUtil.writeAndFlushWithVoidPromise(cnx.ctx(), outBuf);
     }
 
     private void safeIntercept(BaseCommand command, ServerCnx cnx) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a61468c6469..c0357c14a20 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -32,7 +32,6 @@ import static 
org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOption;
@@ -161,6 +160,7 @@ import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.topics.TopicList;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+import org.apache.pulsar.common.util.netty.NettyChannelUtil;
 import org.apache.pulsar.functions.utils.Exceptions;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
@@ -304,11 +304,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         super.channelActive(ctx);
         ConnectionController.State state = 
connectionController.increaseConnection(remoteAddress);
         if (!state.equals(ConnectionController.State.OK)) {
-            ctx.writeAndFlush(Commands.newError(-1, 
ServerError.NotAllowedError,
-                            
state.equals(ConnectionController.State.REACH_MAX_CONNECTION)
-                                    ? "Reached the maximum number of 
connections"
-                                    : "Reached the maximum number of 
connections on address" + remoteAddress))
-                    .addListener(ChannelFutureListener.CLOSE);
+            final ByteBuf msg = Commands.newError(-1, 
ServerError.NotAllowedError,
+                    
state.equals(ConnectionController.State.REACH_MAX_CONNECTION)
+                            ? "Reached the maximum number of connections"
+                            : "Reached the maximum number of connections on 
address" + remoteAddress);
+            NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
             return;
         }
         log.info("New connection from {}", remoteAddress);
@@ -480,7 +480,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 log.debug("[{}] Failed lookup topic {} due to pulsar service 
is not ready: {} state", remoteAddress,
                         topicName, 
this.service.getPulsar().getState().toString());
             }
-            
ctx.writeAndFlush(newLookupErrorResponse(ServerError.ServiceNotReady,
+            writeAndFlush(newLookupErrorResponse(ServerError.ServiceNotReady,
                     "Failed due to pulsar service is not ready", requestId));
             return;
         }
@@ -491,7 +491,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 final String msg = "Valid Proxy Client role should be provided 
for lookup ";
                 log.warn("[{}] {} with role {} and proxyClientAuthRole {} on 
topic {}", remoteAddress, msg, authRole,
                         originalPrincipal, topicName);
-                
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, 
requestId));
+                
writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, 
requestId));
                 lookupSemaphore.release();
                 return;
             }
@@ -502,12 +502,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             getPrincipal(), getAuthenticationData(),
                             requestId, 
advertisedListenerName).handle((lookupResponse, ex) -> {
                                 if (ex == null) {
-                                    ctx.writeAndFlush(lookupResponse);
+                                    writeAndFlush(lookupResponse);
                                 } else {
                                     // it should never happen
                                     log.warn("[{}] lookup failed with error 
{}, {}", remoteAddress, topicName,
                                             ex.getMessage(), ex);
-                                    
ctx.writeAndFlush(newLookupErrorResponse(ServerError.ServiceNotReady,
+                                    
writeAndFlush(newLookupErrorResponse(ServerError.ServiceNotReady,
                                             ex.getMessage(), requestId));
                                 }
                                 lookupSemaphore.release();
@@ -516,14 +516,14 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 } else {
                     final String msg = "Client is not authorized to Lookup";
                     log.warn("[{}] {} with role {} on topic {}", 
remoteAddress, msg, getPrincipal(), topicName);
-                    
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, 
requestId));
+                    
writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, 
requestId));
                     lookupSemaphore.release();
                 }
                 return null;
             }).exceptionally(ex -> {
                 logAuthException(remoteAddress, "lookup", getPrincipal(), 
Optional.of(topicName), ex);
                 final String msg = "Exception occurred while trying to 
authorize lookup";
-                
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, 
requestId));
+                
writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, 
requestId));
                 lookupSemaphore.release();
                 return null;
             });
@@ -531,11 +531,15 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Failed lookup due to too many lookup-requests 
{}", remoteAddress, topicName);
             }
-            
ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests,
+            writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests,
                     "Failed due to too many pending lookup requests", 
requestId));
         }
     }
 
+    private void writeAndFlush(ByteBuf cmd) {
+        NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
+    }
+
     @Override
     protected void 
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata 
partitionMetadata) {
         checkArgument(state == State.Connected);
@@ -557,7 +561,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         partitionMetadata.getTopic(), remoteAddress, requestId,
                         this.service.getPulsar().getState().toString());
             }
-            
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
+            
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
                     "Failed due to pulsar service is not ready", requestId));
             return;
         }
@@ -607,7 +611,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 } else {
                     final String msg = "Client is not authorized to Get 
Partition Metadata";
                     log.warn("[{}] {} with role {} on topic {}", 
remoteAddress, msg, getPrincipal(), topicName);
-                    ctx.writeAndFlush(
+                    writeAndFlush(
                             
Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, 
requestId));
                     lookupSemaphore.release();
                 }
@@ -615,7 +619,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             }).exceptionally(ex -> {
                 logAuthException(remoteAddress, "partition-metadata", 
getPrincipal(), Optional.of(topicName), ex);
                 final String msg = "Exception occurred while trying to 
authorize get Partition Metadata";
-                
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
 msg,
+                
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
 msg,
                         requestId));
                 lookupSemaphore.release();
                 return null;
@@ -657,7 +661,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             msg = createConsumerStatsResponse(consumer, requestId);
         }
 
-        ctx.writeAndFlush(msg);
+        writeAndFlush(msg);
     }
 
     ByteBuf createConsumerStatsResponse(Consumer consumer, long requestId) {
@@ -687,7 +691,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     // complete the connect and sent newConnected command
     private void completeConnect(int clientProtoVersion, String clientVersion, 
boolean supportsTopicWatchers) {
-        ctx.writeAndFlush(Commands.newConnected(clientProtoVersion, 
maxMessageSize, supportsTopicWatchers));
+        writeAndFlush(Commands.newConnected(clientProtoVersion, 
maxMessageSize, supportsTopicWatchers));
         state = State.Connected;
         service.getPulsarStats().recordConnectionCreateSuccess();
         if (log.isDebugEnabled()) {
@@ -763,7 +767,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         }
 
         // auth not complete, continue auth with client side.
-        ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, 
clientProtocolVersion));
+        writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, 
clientProtocolVersion));
         if (log.isDebugEnabled()) {
             log.debug("[{}] Authentication in progress client by method {}.",
                 remoteAddress, authMethod);
@@ -813,7 +817,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             try {
                 AuthData brokerData = authState.refreshAuthentication();
 
-                ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, 
brokerData,
+                writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData,
                         getRemoteEndpointProtocolVersion()));
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Sent auth challenge to client to refresh 
credentials with method: {}.",
@@ -849,8 +853,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 log.debug("Failed CONNECT from {} due to pulsar service is not 
ready: {} state", remoteAddress,
                         this.service.getPulsar().getState().toString());
             }
-            ctx.writeAndFlush(
-                    Commands.newError(-1, ServerError.ServiceNotReady, "Failed 
due to pulsar service is not ready"));
+            writeAndFlush(
+                    Commands.newError(
+                            -1,
+                            ServerError.ServiceNotReady,
+                            "Failed due to pulsar service is not ready")
+            );
             close();
             return;
         }
@@ -961,7 +969,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             service.getPulsarStats().recordConnectionCreateFail();
             logAuthException(remoteAddress, "connect", getPrincipal(), 
Optional.empty(), e);
             String msg = "Unable to authenticate";
-            ctx.writeAndFlush(Commands.newError(-1, 
ServerError.AuthenticationError, msg));
+            writeAndFlush(Commands.newError(-1, 
ServerError.AuthenticationError, msg));
             close();
         }
     }
@@ -985,13 +993,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         } catch (AuthenticationException e) {
             service.getPulsarStats().recordConnectionCreateFail();
             log.warn("[{}] Authentication failed: {} ", remoteAddress, 
e.getMessage());
-            ctx.writeAndFlush(Commands.newError(-1, 
ServerError.AuthenticationError, e.getMessage()));
+            writeAndFlush(Commands.newError(-1, 
ServerError.AuthenticationError, e.getMessage()));
             close();
         } catch (Exception e) {
             service.getPulsarStats().recordConnectionCreateFail();
             String msg = "Unable to handleAuthResponse";
             log.warn("[{}] {} ", remoteAddress, msg, e);
-            ctx.writeAndFlush(Commands.newError(-1, ServerError.UnknownError, 
msg));
+            writeAndFlush(Commands.newError(-1, ServerError.UnknownError, 
msg));
             close();
         }
     }
@@ -1211,7 +1219,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 String msg = "Client is not authorized to subscribe";
                 log.warn("[{}] {} with role {}", remoteAddress, msg, 
getPrincipal());
                 consumers.remove(consumerId, consumerFuture);
-                ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, msg));
+                writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, msg));
             }
             return null;
         }).exceptionally(ex -> {
@@ -1288,7 +1296,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             if (!isAuthorized) {
                 String msg = "Client is not authorized to Produce";
                 log.warn("[{}] {} with role {}", remoteAddress, msg, 
getPrincipal());
-                ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, msg));
+                writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, msg));
                 return null;
             }
 
@@ -1658,7 +1666,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             Consumer consumer = consumerFuture.getNow(null);
             consumer.messageAcked(ack).thenRun(() -> {
                 if (hasRequestId) {
-                    ctx.writeAndFlush(Commands.newAckResponse(
+                    writeAndFlush(Commands.newAckResponse(
                             requestId, null, null, consumerId));
                 }
                 if (brokerInterceptor != null) {
@@ -1666,7 +1674,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 }
             }).exceptionally(e -> {
                 if (hasRequestId) {
-                    ctx.writeAndFlush(Commands.newAckResponse(requestId,
+                    writeAndFlush(Commands.newAckResponse(requestId,
                             BrokerServiceException.getClientErrorCode(e),
                             e.getMessage(), consumerId));
                 }
@@ -1825,7 +1833,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         CompletableFuture<Producer> producerFuture = producers.get(producerId);
         if (producerFuture == null) {
             log.info("[{}] Producer {} was not registered on the connection", 
remoteAddress, producerId);
-            ctx.writeAndFlush(Commands.newSuccess(requestId));
+            writeAndFlush(Commands.newSuccess(requestId));
             return;
         }
 
@@ -1874,7 +1882,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
         if (consumerFuture == null) {
             log.info("[{}] Consumer was not registered on the connection: {}", 
consumerId, remoteAddress);
-            ctx.writeAndFlush(Commands.newSuccess(requestId));
+            writeAndFlush(Commands.newSuccess(requestId));
             return;
         }
 
@@ -1941,7 +1949,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     consumer.getSubscription().getName());
 
         } else {
-            
ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
+            writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
                     ServerError.MetadataError, "Consumer not found"));
         }
     }
@@ -1994,7 +2002,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     handleLastMessageIdFromCompactedLedger(persistentTopic, 
requestId, partitionIndex,
                             markDeletePosition);
                 } else {
-                    ctx.writeAndFlush(Commands.newError(
+                    writeAndFlush(Commands.newError(
                             requestId, ServerError.MetadataError,
                             "Failed to get batch size for entry " + 
e.getMessage()));
                 }
@@ -2006,7 +2014,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             topic.getName(), subscriptionName, lastPosition, 
partitionIndex);
                 }
 
-                
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, 
lastPosition.getLedgerId(),
+                writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, 
lastPosition.getLedgerId(),
                         lastPosition.getEntryId(), partitionIndex, 
largestBatchIndex,
                         markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
                         markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
@@ -2026,12 +2034,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 try {
                     largestBatchIndex = 
calculateTheLastBatchIndexInBatch(metadata, payload);
                 } catch (IOException ioEx){
-                    ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.MetadataError,
+                    writeAndFlush(Commands.newError(requestId, 
ServerError.MetadataError,
                             "Failed to deserialize batched message from the 
last entry of the compacted Ledger: "
                                     + ioEx.getMessage()));
                     return;
                 }
-                
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
+                writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
                         entry.getLedgerId(), entry.getEntryId(), 
partitionIndex, largestBatchIndex,
                         markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
                         markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
@@ -2039,13 +2047,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             } else {
                 // in this case, the ledgers been removed except the current 
ledger
                 // and current ledger without any data
-                
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
+                writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
                         -1, -1, partitionIndex, -1,
                         markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
                         markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
             }
         }).exceptionally(ex -> {
-            ctx.writeAndFlush(Commands.newError(
+            writeAndFlush(Commands.newError(
                     requestId, ServerError.MetadataError,
                     "Failed to read last entry of the compacted Ledger "
                             + ex.getCause().getMessage()));
@@ -2401,12 +2409,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         if (log.isDebugEnabled()) {
                             log.debug("Send response success for add published 
partition to txn request {}", requestId);
                         }
-                        
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
+                        
writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
                                 txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                     } else {
                         ex = handleTxnException(ex, 
BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId);
 
-                        
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
+                        
writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
                                 txnID.getLeastSigBits(),
                                 txnID.getMostSigBits(),
                                 BrokerServiceException.getClientErrorCode(ex),
@@ -2468,13 +2476,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             if (throwable != null) {
                                 log.error("handleEndTxnOnPartition fail!, 
topic {}, txnId: [{}], "
                                         + "txnAction: [{}]", topic, txnID, 
TxnAction.valueOf(txnAction), throwable);
-                                
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
+                                
writeAndFlush(Commands.newEndTxnOnPartitionResponse(
                                         requestId, 
BrokerServiceException.getClientErrorCode(throwable),
                                         throwable.getMessage(),
                                         txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                                 return;
                             }
-                            
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
+                            
writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
                                     txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                         });
 
@@ -2486,7 +2494,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                 log.error("handleEndTxnOnPartition fail ! The 
topic {} does not exist in broker, "
                                                 + "txnId: [{}], txnAction: 
[{}]", topic,
                                         txnID, TxnAction.valueOf(txnAction));
-                                
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
+                                
writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
                                         ServerError.ServiceNotReady,
                                         "The topic " + topic + " does not 
exist in broker.",
                                         txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
@@ -2494,14 +2502,14 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                 log.warn("handleEndTxnOnPartition fail ! The 
topic {} has not been created, "
                                                 + "txnId: [{}], txnAction: 
[{}]",
                                         topic, txnID, 
TxnAction.valueOf(txnAction));
-                                
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
+                                
writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
                                         txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                             }
                         }).exceptionally(e -> {
                             log.error("handleEndTxnOnPartition fail ! topic 
{}, "
                                             + "txnId: [{}], txnAction: [{}]", 
topic, txnID,
                                     TxnAction.valueOf(txnAction), 
e.getCause());
-                            
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
+                            
writeAndFlush(Commands.newEndTxnOnPartitionResponse(
                                     requestId, ServerError.ServiceNotReady,
                                     e.getMessage(), txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                             return null;
@@ -2511,7 +2519,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             log.error("handleEndTxnOnPartition fail ! topic {}, "
                             + "txnId: [{}], txnAction: [{}]", topic, txnID,
                     TxnAction.valueOf(txnAction), e.getCause());
-            ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
+            writeAndFlush(Commands.newEndTxnOnPartitionResponse(
                     requestId, ServerError.ServiceNotReady,
                     e.getMessage(), txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
             return null;
@@ -2543,7 +2551,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     log.warn("handleEndTxnOnSubscription fail! "
                                     + "topic {} subscription {} does not 
exist. txnId: [{}], txnAction: [{}]",
                             optionalTopic.get().getName(), subName, txnID, 
TxnAction.valueOf(txnAction));
-                    ctx.writeAndFlush(
+                    writeAndFlush(
                             
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, 
txnidMostBits));
                     return;
                 }
@@ -2555,13 +2563,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         log.error("handleEndTxnOnSubscription fail ! topic: 
{}, subscription: {}"
                                         + "txnId: [{}], txnAction: [{}]", 
topic, subName,
                                 txnID, TxnAction.valueOf(txnAction), 
e.getCause());
-                        
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+                        writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
                                 requestId, txnidLeastBits, txnidMostBits,
                                 BrokerServiceException.getClientErrorCode(e),
                                 "Handle end txn on subscription failed: " + 
e.getMessage()));
                         return;
                     }
-                    ctx.writeAndFlush(
+                    writeAndFlush(
                             
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, 
txnidMostBits));
                 });
             } else {
@@ -2572,7 +2580,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                 log.error("handleEndTxnOnSubscription fail! 
The topic {} does not exist in broker, "
                                                 + "subscription: {}, txnId: 
[{}], txnAction: [{}]", topic, subName,
                                         txnID, TxnAction.valueOf(txnAction));
-                                
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+                                
writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
                                         requestId, txnID.getLeastSigBits(), 
txnID.getMostSigBits(),
                                         ServerError.ServiceNotReady,
                                         "The topic " + topic + " does not 
exist in broker."));
@@ -2580,14 +2588,14 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                 log.warn("handleEndTxnOnSubscription fail ! 
The topic {} has not been created, "
                                                 + "subscription: {} txnId: 
[{}], txnAction: [{}]",
                                         topic, subName, txnID, 
TxnAction.valueOf(txnAction));
-                                
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId,
+                                
writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId,
                                         txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                             }
                         }).exceptionally(e -> {
                             log.error("handleEndTxnOnSubscription fail ! topic 
{}, subscription: {}"
                                             + "txnId: [{}], txnAction: [{}]", 
topic, subName,
                                     txnID, TxnAction.valueOf(txnAction), 
e.getCause());
-                            
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+                            
writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
                                     requestId, txnID.getLeastSigBits(), 
txnID.getMostSigBits(),
                                     ServerError.ServiceNotReady, 
e.getMessage()));
                             return null;
@@ -2597,7 +2605,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             log.error("handleEndTxnOnSubscription fail ! topic: {}, 
subscription: {}"
                             + "txnId: [{}], txnAction: [{}]", topic, subName,
                     txnID, TxnAction.valueOf(txnAction), e.getCause());
-            ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+            writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
                     requestId, txnidLeastBits, txnidMostBits,
                     ServerError.ServiceNotReady,
                     "Handle end txn on subscription failed: " + 
e.getMessage()));
@@ -2652,12 +2660,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             log.debug("Send response success for add published 
partition to txn request {}",
                                     requestId);
                         }
-                        
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
+                        
writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
                                 txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                     } else {
                         ex = handleTxnException(ex, 
BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);
-
-                        
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId, 
txnID.getLeastSigBits(),
+                        writeAndFlush(
+                                
Commands.newAddSubscriptionToTxnResponse(requestId, txnID.getLeastSigBits(),
                                 txnID.getMostSigBits(), 
BrokerServiceException.getClientErrorCode(ex),
                                 ex.getMessage()));
                         transactionMetadataStoreService.handleOpFail(ex, tcId);
@@ -2743,7 +2751,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         // removes producer-connection from map and send close command to 
producer
         safelyRemoveProducer(producer);
         if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
-            
ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
+            writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), 
-1L));
         } else {
             close();
         }
@@ -2755,7 +2763,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         // removes consumer-connection from map and send close command to 
consumer
         safelyRemoveConsumer(consumer);
         if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
-            ctx.writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(), 
-1L));
+            writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(), 
-1L));
         } else {
             close();
         }
@@ -2998,13 +3006,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             }
 
             if (requestCommand instanceof CommandLookupTopic) {
-                
ctx.writeAndFlush(Commands.newLookupErrorResponse(ServerError.InvalidTopicName,
+                
writeAndFlush(Commands.newLookupErrorResponse(ServerError.InvalidTopicName,
                         "Invalid topic name: " + t.getMessage(), requestId));
             } else if (requestCommand instanceof 
CommandPartitionedTopicMetadata) {
-                
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.InvalidTopicName,
+                
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.InvalidTopicName,
                         "Invalid topic name: " + t.getMessage(), requestId));
             } else {
-                ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.InvalidTopicName,
+                writeAndFlush(Commands.newError(requestId, 
ServerError.InvalidTopicName,
                         "Invalid topic name: " + t.getMessage()));
             }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 72212fe16c7..496652fed0b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -22,6 +22,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelOutboundInvoker;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandAck;
@@ -84,6 +85,7 @@ import 
org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.intercept.InterceptException;
+import org.apache.pulsar.common.util.netty.NettyChannelUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -131,7 +133,8 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                     interceptCommand(cmd);
                     handlePartitionMetadataRequest(cmd.getPartitionMetadata());
                 } catch (InterceptException e) {
-                    
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(getServerError(e.getErrorCode()),
+                    writeAndFlush(ctx,
+                            
Commands.newPartitionMetadataResponse(getServerError(e.getErrorCode()),
                             e.getMessage(), 
cmd.getPartitionMetadata().getRequestId()));
                 }
                 break;
@@ -205,7 +208,7 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                     interceptCommand(cmd);
                     handleProducer(cmd.getProducer());
                 } catch (InterceptException e) {
-                    
ctx.writeAndFlush(Commands.newError(cmd.getProducer().getRequestId(),
+                    writeAndFlush(ctx, 
Commands.newError(cmd.getProducer().getRequestId(),
                             getServerError(e.getErrorCode()), e.getMessage()));
                 }
                 break;
@@ -218,7 +221,7 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                     ByteBuf headersAndPayload = buffer.markReaderIndex();
                     handleSend(cmd.getSend(), headersAndPayload);
                 } catch (InterceptException e) {
-                    
ctx.writeAndFlush(Commands.newSendError(cmd.getSend().getProducerId(),
+                    writeAndFlush(ctx, 
Commands.newSendError(cmd.getSend().getProducerId(),
                             cmd.getSend().getSequenceId(), 
getServerError(e.getErrorCode()), e.getMessage()));
                 }
                 break;
@@ -239,7 +242,7 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                     interceptCommand(cmd);
                     handleSubscribe(cmd.getSubscribe());
                 } catch (InterceptException e) {
-                    
ctx.writeAndFlush(Commands.newError(cmd.getSubscribe().getRequestId(),
+                    writeAndFlush(ctx, 
Commands.newError(cmd.getSubscribe().getRequestId(),
                             getServerError(e.getErrorCode()), e.getMessage()));
                 }
                 break;
@@ -266,8 +269,13 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                     interceptCommand(cmd);
                     handleSeek(cmd.getSeek());
                 } catch (InterceptException e) {
-                    
ctx.writeAndFlush(Commands.newError(cmd.getSeek().getRequestId(), 
getServerError(e.getErrorCode()),
-                            e.getMessage()));
+                    writeAndFlush(ctx,
+                            Commands.newError(
+                                    cmd.getSeek().getRequestId(),
+                                    getServerError(e.getErrorCode()),
+                                    e.getMessage()
+                            )
+                    );
                 }
                 break;
 
@@ -326,7 +334,7 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                     interceptCommand(cmd);
                     handleGetTopicsOfNamespace(cmd.getGetTopicsOfNamespace());
                 } catch (InterceptException e) {
-                    
ctx.writeAndFlush(Commands.newError(cmd.getGetTopicsOfNamespace().getRequestId(),
+                    writeAndFlush(ctx, 
Commands.newError(cmd.getGetTopicsOfNamespace().getRequestId(),
                             getServerError(e.getErrorCode()), e.getMessage()));
                 }
                 break;
@@ -342,7 +350,7 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                     interceptCommand(cmd);
                     handleGetSchema(cmd.getGetSchema());
                 } catch (InterceptException e) {
-                    
ctx.writeAndFlush(Commands.newGetSchemaResponseError(cmd.getGetSchema().getRequestId(),
+                    writeAndFlush(ctx, 
Commands.newGetSchemaResponseError(cmd.getGetSchema().getRequestId(),
                             getServerError(e.getErrorCode()), e.getMessage()));
                 }
                 break;
@@ -358,7 +366,7 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                     interceptCommand(cmd);
                     handleGetOrCreateSchema(cmd.getGetOrCreateSchema());
                 } catch (InterceptException e) {
-                    
ctx.writeAndFlush(Commands.newGetOrCreateSchemaResponseError(
+                    writeAndFlush(ctx, 
Commands.newGetOrCreateSchemaResponseError(
                             cmd.getGetOrCreateSchema().getRequestId(), 
getServerError(e.getErrorCode()),
                             e.getMessage()));
                 }
@@ -731,4 +739,8 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(PulsarDecoder.class);
+
+    private void writeAndFlush(ChannelOutboundInvoker ctx, ByteBuf cmd) {
+        NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyChannelUtil.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyChannelUtil.java
new file mode 100644
index 00000000000..c466822e205
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyChannelUtil.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOutboundInvoker;
+import io.netty.channel.VoidChannelPromise;
+
+/**
+ * Contains utility methods for working with Netty Channels.
+ */
+public final class NettyChannelUtil {
+
+    private NettyChannelUtil() {
+    }
+
+    /**
+     * Write and flush the message to the channel.
+     *
+     * The promise is an instance of {@link VoidChannelPromise} that properly 
propagates exceptions up to the pipeline.
+     * Netty has many ad-hoc optimization if the promise is an instance of 
{@link VoidChannelPromise}.
+     * Lastly, it reduces pollution of useless {@link 
io.netty.channel.ChannelPromise} objects created
+     * by the default write and flush method {@link 
ChannelOutboundInvoker#writeAndFlush(Object)}.
+     * See https://stackoverflow.com/q/54169262 and 
https://stackoverflow.com/a/9030420 for more details.
+     *
+     * @param ctx channel's context
+     * @param msg buffer to write in the channel
+     */
+    public static void writeAndFlushWithVoidPromise(ChannelOutboundInvoker 
ctx, ByteBuf msg) {
+        ctx.writeAndFlush(msg, ctx.voidPromise());
+    }
+
+    /**
+     * Write and flush the message to the channel and the close the channel.
+     *
+     * This method is particularly helpful when the connection is in an 
invalid state
+     * and therefore a new connection must be created to continue.
+     *
+     * @param ctx channel's context
+     * @param msg buffer to write in the channel
+     */
+    public static void writeAndFlushWithClosePromise(ChannelOutboundInvoker 
ctx, ByteBuf msg) {
+        ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
+    }
+
+}
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/NettyChannelUtilTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/NettyChannelUtilTest.java
new file mode 100644
index 00000000000..53a5f91de1d
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/NettyChannelUtilTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.netty;
+
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOutboundInvoker;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.VoidChannelPromise;
+import java.nio.charset.StandardCharsets;
+import org.testng.annotations.Test;
+
+public class NettyChannelUtilTest {
+
+    @Test
+    public void testWriteAndFlushWithVoidPromise() {
+        final ChannelOutboundInvoker ctx = mock(ChannelOutboundInvoker.class);
+        final VoidChannelPromise voidChannelPromise = 
mock(VoidChannelPromise.class);
+        when(ctx.voidPromise()).thenReturn(voidChannelPromise);
+        final byte[] data = "test".getBytes(StandardCharsets.UTF_8);
+        final ByteBuf byteBuf = Unpooled.wrappedBuffer(data, 0, data.length);
+        try {
+            NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, byteBuf);
+            verify(ctx).writeAndFlush(same(byteBuf), same(voidChannelPromise));
+            verify(ctx).voidPromise();
+        } finally {
+            byteBuf.release();
+        }
+    }
+
+    @Test
+    public void testWriteAndFlushWithClosePromise() {
+        final ChannelOutboundInvoker ctx = mock(ChannelOutboundInvoker.class);
+        final ChannelPromise promise = mock(ChannelPromise.class);
+
+        final byte[] data = "test".getBytes(StandardCharsets.UTF_8);
+        final ByteBuf byteBuf = Unpooled.wrappedBuffer(data, 0, data.length);
+        when(ctx.writeAndFlush(same(byteBuf))).thenReturn(promise);
+        try {
+            NettyChannelUtil.writeAndFlushWithClosePromise(ctx, byteBuf);
+            verify(ctx).writeAndFlush(same(byteBuf));
+            verify(promise).addListener(same(ChannelFutureListener.CLOSE));
+        } finally {
+            byteBuf.release();
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index d51f17b4731..4b5fef3a994 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -26,7 +26,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
@@ -65,6 +64,7 @@ import 
org.apache.pulsar.common.util.NettyClientSslContextRefresher;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
 import 
org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
+import org.apache.pulsar.common.util.netty.NettyChannelUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -238,8 +238,8 @@ public class DirectProxyHandler {
 
     private void writeHAProxyMessage() {
         if (proxyConnection.hasHAProxyMessage()) {
-            
outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()))
-                    
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            final ByteBuf msg = 
encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage());
+            writeAndFlush(msg);
         } else {
             if (inboundChannel.remoteAddress() instanceof InetSocketAddress
                     && inboundChannel.localAddress() instanceof 
InetSocketAddress) {
@@ -252,8 +252,8 @@ public class DirectProxyHandler {
                 HAProxyMessage msg = new 
HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,
                         HAProxyProxiedProtocol.TCP4, sourceAddress, 
destinationAddress, sourcePort,
                         destinationPort);
-                outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg))
-                        
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+                final ByteBuf encodedMsg = encodeProxyProtocolMessage(msg);
+                writeAndFlush(encodedMsg);
                 msg.release();
             }
         }
@@ -323,11 +323,11 @@ public class DirectProxyHandler {
             // Send the Connect command to broker
             authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
             AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-            ByteBuf command;
-            command = Commands.newConnect(authentication.getAuthMethodName(), 
authData, protocolVersion, "Pulsar proxy",
-                    null /* target broker */, originalPrincipal, 
clientAuthData, clientAuthMethod);
-            outboundChannel.writeAndFlush(command)
-                    
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            ByteBuf command = Commands.newConnect(
+                    authentication.getAuthMethodName(), authData, 
protocolVersion,
+                    "Pulsar proxy", null /* target broker */,
+                    originalPrincipal, clientAuthData, clientAuthMethod);
+            writeAndFlush(command);
             isTlsOutboundChannel = 
ProxyConnection.isTlsChannel(inboundChannel);
         }
 
@@ -358,8 +358,7 @@ public class DirectProxyHandler {
                 if (msg instanceof ByteBuf) {
                     ProxyService.BYTES_COUNTER.inc(((ByteBuf) 
msg).readableBytes());
                 }
-                inboundChannel.writeAndFlush(msg)
-                        
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+                inboundChannel.writeAndFlush(msg, 
inboundChannel.voidPromise());
 
                 if (service.proxyZeroCopyModeEnabled && service.proxyLogLevel 
== 0) {
                     if (!isTlsOutboundChannel && 
!DirectProxyHandler.this.proxyConnection.isTlsInboundChannel) {
@@ -412,8 +411,7 @@ public class DirectProxyHandler {
                     log.debug("{} Mutual auth {}", ctx.channel(), 
authentication.getAuthMethodName());
                 }
 
-                outboundChannel.writeAndFlush(request)
-                        
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+                writeAndFlush(request);
             } catch (Exception e) {
                 log.error("Error mutual verify", e);
             }
@@ -495,5 +493,9 @@ public class DirectProxyHandler {
         }
     }
 
+    private void writeAndFlush(ByteBuf cmd) {
+        NettyChannelUtil.writeAndFlushWithVoidPromise(outboundChannel, cmd);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(DirectProxyHandler.class);
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 5101978c3e2..6ec597ec1cf 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.proxy.server;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
 import io.prometheus.client.Counter;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -38,6 +39,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.util.netty.NettyChannelUtil;
 import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,7 +115,7 @@ public class LookupProxyHandler {
                 log.debug("Lookup Request ID {} from {} rejected - {}.", 
clientRequestId, clientAddress,
                         throttlingErrorMessage);
             }
-            
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+            
writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
                     throttlingErrorMessage, clientRequestId));
         }
 
@@ -122,7 +124,7 @@ public class LookupProxyHandler {
     private void performLookup(long clientRequestId, String topic, String 
brokerServiceUrl, boolean authoritative,
             int numberOfRetries) {
         if (numberOfRetries == 0) {
-            
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+            
writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
                     "Reached max number of redirections", clientRequestId));
             return;
         }
@@ -131,7 +133,7 @@ public class LookupProxyHandler {
         try {
             brokerURI = new URI(brokerServiceUrl);
         } catch (URISyntaxException e) {
-            proxyConnection.ctx().writeAndFlush(
+            writeAndFlush(
                     Commands.newLookupErrorResponse(ServerError.MetadataError, 
e.getMessage(), clientRequestId));
             return;
         }
@@ -150,7 +152,7 @@ public class LookupProxyHandler {
             clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
                 if (t != null) {
                     log.warn("[{}] Failed to lookup topic {}: {}", 
clientAddress, topic, t.getMessage());
-                    proxyConnection.ctx().writeAndFlush(
+                    writeAndFlush(
                         Commands.newLookupErrorResponse(getServerError(t), 
t.getMessage(), clientRequestId));
                 } else {
                     String brokerUrl = connectWithTLS ? r.brokerUrlTls : 
r.brokerUrl;
@@ -170,7 +172,7 @@ public class LookupProxyHandler {
                                             + " with clientReq Id '{}' and 
lookup-broker {}",
                                     addr, topic, clientRequestId, brokerUrl);
                         }
-                        
proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl, 
brokerUrl, true,
+                        writeAndFlush(Commands.newLookupResponse(brokerUrl, 
brokerUrl, true,
                             LookupType.Connect, clientRequestId, true /* this 
is coming from proxy */));
                     }
                 }
@@ -178,7 +180,7 @@ public class LookupProxyHandler {
             });
         }).exceptionally(ex -> {
             // Failed to connect to backend broker
-            proxyConnection.ctx().writeAndFlush(
+            writeAndFlush(
                     Commands.newLookupErrorResponse(getServerError(ex), 
ex.getMessage(), clientRequestId));
             return null;
         });
@@ -202,7 +204,7 @@ public class LookupProxyHandler {
                 log.debug("PartitionMetaData Request ID {} from {} rejected - 
{}.", clientRequestId, clientAddress,
                         throttlingErrorMessage);
             }
-            
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
+            
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
                     throttlingErrorMessage, clientRequestId));
         }
     }
@@ -239,17 +241,17 @@ public class LookupProxyHandler {
                 if (t != null) {
                     log.warn("[{}] failed to get Partitioned metadata : {}", 
topicName.toString(),
                         t.getMessage(), t);
-                    
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(t),
+                    
writeAndFlush(Commands.newLookupErrorResponse(getServerError(t),
                         t.getMessage(), clientRequestId));
                 } else {
-                    proxyConnection.ctx().writeAndFlush(
+                    writeAndFlush(
                         Commands.newPartitionMetadataResponse(r.partitions, 
clientRequestId));
                 }
                 
proxyConnection.getConnectionPool().releaseConnection(clientCnx);
             });
         }).exceptionally(ex -> {
             // Failed to connect to backend broker
-            
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(getServerError(ex),
+            
writeAndFlush(Commands.newPartitionMetadataResponse(getServerError(ex),
                     ex.getMessage(), clientRequestId));
             return null;
         });
@@ -275,7 +277,7 @@ public class LookupProxyHandler {
                 log.debug("GetTopicsOfNamespace Request ID {} from {} rejected 
- {}.", requestId, clientAddress,
                     throttlingErrorMessage);
             }
-            proxyConnection.ctx().writeAndFlush(Commands.newError(
+            writeAndFlush(Commands.newError(
                 requestId, ServerError.ServiceNotReady, throttlingErrorMessage
             ));
         }
@@ -304,7 +306,7 @@ public class LookupProxyHandler {
                                              String topicsHash,
                                              CommandGetTopicsOfNamespace.Mode 
mode) {
         if (numberOfRetries == 0) {
-            
proxyConnection.ctx().writeAndFlush(Commands.newError(clientRequestId, 
ServerError.ServiceNotReady,
+            writeAndFlush(Commands.newError(clientRequestId, 
ServerError.ServiceNotReady,
                     "Reached max number of redirections"));
             return;
         }
@@ -329,10 +331,10 @@ public class LookupProxyHandler {
                 if (t != null) {
                     log.warn("[{}] Failed to get TopicsOfNamespace {}: {}",
                             clientAddress, namespaceName, t.getMessage());
-                    proxyConnection.ctx().writeAndFlush(
+                    writeAndFlush(
                         Commands.newError(clientRequestId, getServerError(t), 
t.getMessage()));
                 } else {
-                    proxyConnection.ctx().writeAndFlush(
+                    writeAndFlush(
                         
Commands.newGetTopicsOfNamespaceResponse(r.getTopics(), r.getTopicsHash(), 
r.isFiltered(),
                                 r.isChanged(), clientRequestId));
                 }
@@ -341,7 +343,7 @@ public class LookupProxyHandler {
             proxyConnection.getConnectionPool().releaseConnection(clientCnx);
         }).exceptionally(ex -> {
             // Failed to connect to backend broker
-            proxyConnection.ctx().writeAndFlush(
+            writeAndFlush(
                     Commands.newError(clientRequestId, getServerError(ex), 
ex.getMessage()));
             return null;
         });
@@ -384,10 +386,10 @@ public class LookupProxyHandler {
             clientCnx.sendGetRawSchema(command, requestId).whenComplete((r, t) 
-> {
                 if (t != null) {
                     log.warn("[{}] Failed to get schema {}: {}", 
clientAddress, topic, t);
-                    proxyConnection.ctx().writeAndFlush(
+                    writeAndFlush(
                         Commands.newError(clientRequestId, getServerError(t), 
t.getMessage()));
                 } else {
-                    proxyConnection.ctx().writeAndFlush(
+                    writeAndFlush(
                         Commands.newGetSchemaResponse(clientRequestId, r));
                 }
 
@@ -395,7 +397,7 @@ public class LookupProxyHandler {
             });
         }).exceptionally(ex -> {
             // Failed to connect to backend broker
-            proxyConnection.ctx().writeAndFlush(
+            writeAndFlush(
                     Commands.newError(clientRequestId, getServerError(ex), 
ex.getMessage()));
             return null;
         });
@@ -414,7 +416,7 @@ public class LookupProxyHandler {
             availableBroker = discoveryProvider.nextBroker();
         } catch (Exception e) {
             log.warn("[{}] Failed to get next active broker {}", 
clientAddress, e.getMessage(), e);
-            proxyConnection.ctx().writeAndFlush(Commands.newError(
+            writeAndFlush(Commands.newError(
                     clientRequestId, ServerError.ServiceNotReady, 
e.getMessage()
             ));
             return null;
@@ -427,7 +429,7 @@ public class LookupProxyHandler {
         try {
             brokerURI = new URI(brokerServiceUrl);
         } catch (URISyntaxException e) {
-            proxyConnection.ctx().writeAndFlush(
+            writeAndFlush(
                     Commands.newError(clientRequestId, 
ServerError.MetadataError, e.getMessage()));
             return null;
         }
@@ -446,5 +448,10 @@ public class LookupProxyHandler {
         return responseError;
     }
 
+    private void writeAndFlush(ByteBuf cmd) {
+        final ChannelHandlerContext ctx = proxyConnection.ctx();
+        NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(LookupProxyHandler.class);
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 56fbe946069..fe4d29ac4ab 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -71,6 +71,7 @@ import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
+import org.apache.pulsar.common.util.netty.NettyChannelUtil;
 import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -261,8 +262,8 @@ public class ProxyConnection extends PulsarHandler {
                     
directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
                     ProxyService.BYTES_COUNTER.inc(bytes);
                 }
-                directProxyHandler.outboundChannel.writeAndFlush(msg)
-                        
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+                directProxyHandler.outboundChannel
+                        .writeAndFlush(msg, 
directProxyHandler.outboundChannel.voidPromise());
 
                 if (service.proxyZeroCopyModeEnabled && service.proxyLogLevel 
== 0) {
                     if (!directProxyHandler.isTlsOutboundChannel && 
!isTlsInboundChannel) {
@@ -345,10 +346,9 @@ public class ProxyConnection extends PulsarHandler {
                 state = State.Closing;
                 LOG.warn("[{}] Target broker '{}' isn't available. 
authenticated with {} role {}.",
                         remoteAddress, proxyToBrokerUrl, authMethod, 
clientAuthRole);
-                ctx()
-                        .writeAndFlush(
-                                Commands.newError(-1, 
ServerError.ServiceNotReady, "Target broker isn't available."))
-                        .addListener(ChannelFutureListener.CLOSE);
+                final ByteBuf msg = Commands.newError(-1,
+                        ServerError.ServiceNotReady, "Target broker isn't 
available.");
+                writeAndFlushAndClose(msg);
                 return;
             }
 
@@ -369,11 +369,9 @@ public class ProxyConnection extends PulsarHandler {
                             LOG.error("[{}] Error validating target broker 
'{}'. authenticated with {} role {}.",
                                     remoteAddress, proxyToBrokerUrl, 
authMethod, clientAuthRole, throwable);
                         }
-                        ctx()
-                                .writeAndFlush(
-                                        Commands.newError(-1, 
ServerError.ServiceNotReady,
-                                                "Target broker cannot be 
validated."))
-                                .addListener(ChannelFutureListener.CLOSE);
+                        final ByteBuf msg = Commands.newError(-1, 
ServerError.ServiceNotReady,
+                                "Target broker cannot be validated.");
+                        writeAndFlushAndClose(msg);
                         return null;
                     });
         } else {
@@ -382,8 +380,8 @@ public class ProxyConnection extends PulsarHandler {
             // partitions metadata lookups
             state = State.ProxyLookupRequests;
             lookupProxyHandler = new LookupProxyHandler(service, this);
-            
ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise, false))
-                    
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            final ByteBuf msg = 
Commands.newConnected(protocolVersionToAdvertise, false);
+            writeAndFlush(msg);
         }
     }
 
@@ -394,9 +392,9 @@ public class ProxyConnection extends PulsarHandler {
             state = State.ProxyConnectionToBroker;
             int maxMessageSize =
                     connected.hasMaxMessageSize() ? 
connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
-            
ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), 
maxMessageSize,
-                            connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsTopicWatchers()))
-                    
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            final ByteBuf msg = 
Commands.newConnected(connected.getProtocolVersion(), maxMessageSize,
+                    connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsTopicWatchers());
+            writeAndFlush(msg);
         } else {
             LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
                             + "Closing connection to broker '{}'.",
@@ -445,8 +443,8 @@ public class ProxyConnection extends PulsarHandler {
         }
 
         // auth not complete, continue auth with client side.
-        ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, 
protocolVersionToAdvertise))
-                .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+        final ByteBuf msg = Commands.newAuthChallenge(authMethod, brokerData, 
protocolVersionToAdvertise);
+        writeAndFlush(msg);
         if (LOG.isDebugEnabled()) {
             LOG.debug("[{}] Authentication in progress client by method {}.",
                     remoteAddress, authMethod);
@@ -524,8 +522,8 @@ public class ProxyConnection extends PulsarHandler {
             doAuthentication(clientData);
         } catch (Exception e) {
             LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
-            ctx.writeAndFlush(Commands.newError(-1, 
ServerError.AuthenticationError, "Failed to authenticate"))
-                    .addListener(ChannelFutureListener.CLOSE);
+            final ByteBuf msg = Commands.newError(-1, 
ServerError.AuthenticationError, "Failed to authenticate");
+            writeAndFlushAndClose(msg);
         }
     }
 
@@ -589,10 +587,10 @@ public class ProxyConnection extends PulsarHandler {
                 });
             }
         } catch (Exception e) {
-            String msg = "Unable to handleAuthResponse";
-            LOG.warn("[{}] {} ", remoteAddress, msg, e);
-            ctx.writeAndFlush(Commands.newError(-1, 
ServerError.AuthenticationError, msg))
-                    .addListener(ChannelFutureListener.CLOSE);
+            String errorMsg = "Unable to handleAuthResponse";
+            LOG.warn("[{}] {} ", remoteAddress, errorMsg, e);
+            final ByteBuf msg = Commands.newError(-1, 
ServerError.AuthenticationError, errorMsg);
+            writeAndFlushAndClose(msg);
         }
     }
 
@@ -727,4 +725,12 @@ public class ProxyConnection extends PulsarHandler {
                 && pulsarServiceUrl.startsWith(expectedPrefix)
                 && pulsarServiceUrl.startsWith(brokerHostPort, 
expectedPrefix.length());
     }
+
+    private void writeAndFlush(ByteBuf cmd) {
+        NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
+    }
+
+    private void writeAndFlushAndClose(ByteBuf cmd) {
+        NettyChannelUtil.writeAndFlushWithClosePromise(ctx, cmd);
+    }
 }

Reply via email to