This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d6fcdb8e1e1 [improve][broker,proxy] Use ChannelVoidPromise to avoid
useless promise objects creation (#19141)
d6fcdb8e1e1 is described below
commit d6fcdb8e1e192e0d9e2fbe6307d273b27ed2930f
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Jan 10 16:23:24 2023 +0100
[improve][broker,proxy] Use ChannelVoidPromise to avoid useless promise
objects creation (#19141)
---
.../broker/service/PulsarCommandSenderImpl.java | 58 ++++-----
.../apache/pulsar/broker/service/ServerCnx.java | 130 +++++++++++----------
.../pulsar/common/protocol/PulsarDecoder.java | 30 +++--
.../pulsar/common/util/netty/NettyChannelUtil.java | 63 ++++++++++
.../common/util/netty/NettyChannelUtilTest.java | 68 +++++++++++
.../pulsar/proxy/server/DirectProxyHandler.java | 30 ++---
.../pulsar/proxy/server/LookupProxyHandler.java | 47 ++++----
.../pulsar/proxy/server/ProxyConnection.java | 54 +++++----
8 files changed, 324 insertions(+), 156 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index e7cc25b2c3b..6510da1fbe7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.util.netty.NettyChannelUtil;
@Slf4j
public class PulsarCommandSenderImpl implements PulsarCommandSender {
@@ -55,7 +56,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
BaseCommand command =
Commands.newPartitionMetadataResponseCommand(error, errorMsg, requestId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -63,7 +64,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
BaseCommand command =
Commands.newPartitionMetadataResponseCommand(partitions, requestId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -71,7 +72,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
BaseCommand command = Commands.newSuccessCommand(requestId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -79,7 +80,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
BaseCommand command = Commands.newErrorCommand(requestId, error,
message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -87,7 +88,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
BaseCommand command = Commands.newProducerSuccessCommand(requestId,
producerName, schemaVersion);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -98,7 +99,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
schemaVersion, topicEpoch, isProducerReady);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -108,7 +109,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
entryId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -116,7 +117,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
BaseCommand command = Commands.newSendErrorCommand(producerId,
sequenceId, error, errorMsg);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -126,7 +127,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
filtered, changed, requestId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -134,7 +135,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
BaseCommand command = Commands.newGetSchemaResponseCommand(requestId,
schema, version);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -142,7 +143,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
BaseCommand command =
Commands.newGetSchemaResponseErrorCommand(requestId, error, errorMessage);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -150,7 +151,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
BaseCommand command =
Commands.newGetOrCreateSchemaResponseCommand(requestId, schemaVersion);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -159,7 +160,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
Commands.newGetOrCreateSchemaResponseErrorCommand(requestId,
error, errorMessage);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -168,7 +169,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
clientProtocolVersion, maxMessageSize, supportsTopicWatchers);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -179,7 +180,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
authoritative, response, requestId, proxyThroughServiceUrl);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -187,7 +188,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
BaseCommand command = Commands.newLookupErrorResponseCommand(error,
errorMsg, requestId);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -196,9 +197,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
// if the client is older than `v12`, we don't need to send
consumer group changes.
return;
}
- cnx.ctx().writeAndFlush(
- Commands.newActiveConsumerChange(consumerId, isActive),
- cnx.ctx().voidPromise());
+ writeAndFlush(Commands.newActiveConsumerChange(consumerId, isActive));
}
@Override
@@ -206,7 +205,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
// Only send notification if the client understand the command
if (cnx.getRemoteEndpointProtocolVersion() >=
ProtocolVersion.v9.getValue()) {
log.info("[{}] Notifying consumer that end of topic has been
reached", this);
- cnx.ctx().writeAndFlush(Commands.newReachedEndOfTopic(consumerId),
cnx.ctx().voidPromise());
+ writeAndFlush(Commands.newReachedEndOfTopic(consumerId));
}
}
@@ -215,8 +214,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
// Only send notification if the client understand the command
if (cnx.getRemoteEndpointProtocolVersion() >=
ProtocolVersion.v20.getValue()) {
log.info("[{}] Notifying {} that topic is migrated", type.name(),
resourceId);
- cnx.ctx().writeAndFlush(Commands.newTopicMigrated(type,
resourceId, brokerUrl, brokerUrlTls),
- cnx.ctx().voidPromise());
+ writeAndFlush(Commands.newTopicMigrated(type, resourceId,
brokerUrl, brokerUrlTls));
return true;
}
return false;
@@ -310,7 +308,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
BaseCommand command = Commands.newTcClientConnectResponse(requestId,
error, message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -324,7 +322,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
txnID.getMostSigBits());
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
if (this.interceptor != null) {
this.interceptor.txnOpened(tcID, txnID.toString());
}
@@ -335,7 +333,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
BaseCommand command = Commands.newTxnResponse(requestId, tcID, error,
message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
}
@Override
@@ -344,7 +342,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
txnID.getMostSigBits());
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
if (this.interceptor != null) {
this.interceptor.txnEnded(txnID.toString(), txnAction);
}
@@ -356,7 +354,7 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
txnID.getMostSigBits(), error, message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
+ writeAndFlush(outBuf);
if (this.interceptor != null) {
this.interceptor.txnEnded(txnID.toString(), TxnAction.ABORT_VALUE);
}
@@ -378,7 +376,11 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
private void interceptAndWriteCommand(BaseCommand command) {
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
- cnx.ctx().writeAndFlush(outBuf);
+ writeAndFlush(outBuf);
+ }
+
+ private void writeAndFlush(ByteBuf outBuf) {
+ NettyChannelUtil.writeAndFlushWithVoidPromise(cnx.ctx(), outBuf);
}
private void safeIntercept(BaseCommand command, ServerCnx cnx) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a61468c6469..c0357c14a20 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -32,7 +32,6 @@ import static
org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
@@ -161,6 +160,7 @@ import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+import org.apache.pulsar.common.util.netty.NettyChannelUtil;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
@@ -304,11 +304,11 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
super.channelActive(ctx);
ConnectionController.State state =
connectionController.increaseConnection(remoteAddress);
if (!state.equals(ConnectionController.State.OK)) {
- ctx.writeAndFlush(Commands.newError(-1,
ServerError.NotAllowedError,
-
state.equals(ConnectionController.State.REACH_MAX_CONNECTION)
- ? "Reached the maximum number of
connections"
- : "Reached the maximum number of
connections on address" + remoteAddress))
- .addListener(ChannelFutureListener.CLOSE);
+ final ByteBuf msg = Commands.newError(-1,
ServerError.NotAllowedError,
+
state.equals(ConnectionController.State.REACH_MAX_CONNECTION)
+ ? "Reached the maximum number of connections"
+ : "Reached the maximum number of connections on
address" + remoteAddress);
+ NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
return;
}
log.info("New connection from {}", remoteAddress);
@@ -480,7 +480,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.debug("[{}] Failed lookup topic {} due to pulsar service
is not ready: {} state", remoteAddress,
topicName,
this.service.getPulsar().getState().toString());
}
-
ctx.writeAndFlush(newLookupErrorResponse(ServerError.ServiceNotReady,
+ writeAndFlush(newLookupErrorResponse(ServerError.ServiceNotReady,
"Failed due to pulsar service is not ready", requestId));
return;
}
@@ -491,7 +491,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final String msg = "Valid Proxy Client role should be provided
for lookup ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on
topic {}", remoteAddress, msg, authRole,
originalPrincipal, topicName);
-
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg,
requestId));
+
writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg,
requestId));
lookupSemaphore.release();
return;
}
@@ -502,12 +502,12 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
getPrincipal(), getAuthenticationData(),
requestId,
advertisedListenerName).handle((lookupResponse, ex) -> {
if (ex == null) {
- ctx.writeAndFlush(lookupResponse);
+ writeAndFlush(lookupResponse);
} else {
// it should never happen
log.warn("[{}] lookup failed with error
{}, {}", remoteAddress, topicName,
ex.getMessage(), ex);
-
ctx.writeAndFlush(newLookupErrorResponse(ServerError.ServiceNotReady,
+
writeAndFlush(newLookupErrorResponse(ServerError.ServiceNotReady,
ex.getMessage(), requestId));
}
lookupSemaphore.release();
@@ -516,14 +516,14 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
} else {
final String msg = "Client is not authorized to Lookup";
log.warn("[{}] {} with role {} on topic {}",
remoteAddress, msg, getPrincipal(), topicName);
-
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg,
requestId));
+
writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg,
requestId));
lookupSemaphore.release();
}
return null;
}).exceptionally(ex -> {
logAuthException(remoteAddress, "lookup", getPrincipal(),
Optional.of(topicName), ex);
final String msg = "Exception occurred while trying to
authorize lookup";
-
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg,
requestId));
+
writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg,
requestId));
lookupSemaphore.release();
return null;
});
@@ -531,11 +531,15 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed lookup due to too many lookup-requests
{}", remoteAddress, topicName);
}
-
ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests,
+ writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests,
"Failed due to too many pending lookup requests",
requestId));
}
}
+ private void writeAndFlush(ByteBuf cmd) {
+ NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
+ }
+
@Override
protected void
handlePartitionMetadataRequest(CommandPartitionedTopicMetadata
partitionMetadata) {
checkArgument(state == State.Connected);
@@ -557,7 +561,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
partitionMetadata.getTopic(), remoteAddress, requestId,
this.service.getPulsar().getState().toString());
}
-
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
+
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
"Failed due to pulsar service is not ready", requestId));
return;
}
@@ -607,7 +611,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
} else {
final String msg = "Client is not authorized to Get
Partition Metadata";
log.warn("[{}] {} with role {} on topic {}",
remoteAddress, msg, getPrincipal(), topicName);
- ctx.writeAndFlush(
+ writeAndFlush(
Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg,
requestId));
lookupSemaphore.release();
}
@@ -615,7 +619,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}).exceptionally(ex -> {
logAuthException(remoteAddress, "partition-metadata",
getPrincipal(), Optional.of(topicName), ex);
final String msg = "Exception occurred while trying to
authorize get Partition Metadata";
-
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
msg,
+
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
msg,
requestId));
lookupSemaphore.release();
return null;
@@ -657,7 +661,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
msg = createConsumerStatsResponse(consumer, requestId);
}
- ctx.writeAndFlush(msg);
+ writeAndFlush(msg);
}
ByteBuf createConsumerStatsResponse(Consumer consumer, long requestId) {
@@ -687,7 +691,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
// complete the connect and sent newConnected command
private void completeConnect(int clientProtoVersion, String clientVersion,
boolean supportsTopicWatchers) {
- ctx.writeAndFlush(Commands.newConnected(clientProtoVersion,
maxMessageSize, supportsTopicWatchers));
+ writeAndFlush(Commands.newConnected(clientProtoVersion,
maxMessageSize, supportsTopicWatchers));
state = State.Connected;
service.getPulsarStats().recordConnectionCreateSuccess();
if (log.isDebugEnabled()) {
@@ -763,7 +767,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
// auth not complete, continue auth with client side.
- ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData,
clientProtocolVersion));
+ writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData,
clientProtocolVersion));
if (log.isDebugEnabled()) {
log.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
@@ -813,7 +817,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
try {
AuthData brokerData = authState.refreshAuthentication();
- ctx.writeAndFlush(Commands.newAuthChallenge(authMethod,
brokerData,
+ writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData,
getRemoteEndpointProtocolVersion()));
if (log.isDebugEnabled()) {
log.debug("[{}] Sent auth challenge to client to refresh
credentials with method: {}.",
@@ -849,8 +853,12 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.debug("Failed CONNECT from {} due to pulsar service is not
ready: {} state", remoteAddress,
this.service.getPulsar().getState().toString());
}
- ctx.writeAndFlush(
- Commands.newError(-1, ServerError.ServiceNotReady, "Failed
due to pulsar service is not ready"));
+ writeAndFlush(
+ Commands.newError(
+ -1,
+ ServerError.ServiceNotReady,
+ "Failed due to pulsar service is not ready")
+ );
close();
return;
}
@@ -961,7 +969,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
service.getPulsarStats().recordConnectionCreateFail();
logAuthException(remoteAddress, "connect", getPrincipal(),
Optional.empty(), e);
String msg = "Unable to authenticate";
- ctx.writeAndFlush(Commands.newError(-1,
ServerError.AuthenticationError, msg));
+ writeAndFlush(Commands.newError(-1,
ServerError.AuthenticationError, msg));
close();
}
}
@@ -985,13 +993,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
} catch (AuthenticationException e) {
service.getPulsarStats().recordConnectionCreateFail();
log.warn("[{}] Authentication failed: {} ", remoteAddress,
e.getMessage());
- ctx.writeAndFlush(Commands.newError(-1,
ServerError.AuthenticationError, e.getMessage()));
+ writeAndFlush(Commands.newError(-1,
ServerError.AuthenticationError, e.getMessage()));
close();
} catch (Exception e) {
service.getPulsarStats().recordConnectionCreateFail();
String msg = "Unable to handleAuthResponse";
log.warn("[{}] {} ", remoteAddress, msg, e);
- ctx.writeAndFlush(Commands.newError(-1, ServerError.UnknownError,
msg));
+ writeAndFlush(Commands.newError(-1, ServerError.UnknownError,
msg));
close();
}
}
@@ -1211,7 +1219,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
String msg = "Client is not authorized to subscribe";
log.warn("[{}] {} with role {}", remoteAddress, msg,
getPrincipal());
consumers.remove(consumerId, consumerFuture);
- ctx.writeAndFlush(Commands.newError(requestId,
ServerError.AuthorizationError, msg));
+ writeAndFlush(Commands.newError(requestId,
ServerError.AuthorizationError, msg));
}
return null;
}).exceptionally(ex -> {
@@ -1288,7 +1296,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (!isAuthorized) {
String msg = "Client is not authorized to Produce";
log.warn("[{}] {} with role {}", remoteAddress, msg,
getPrincipal());
- ctx.writeAndFlush(Commands.newError(requestId,
ServerError.AuthorizationError, msg));
+ writeAndFlush(Commands.newError(requestId,
ServerError.AuthorizationError, msg));
return null;
}
@@ -1658,7 +1666,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
Consumer consumer = consumerFuture.getNow(null);
consumer.messageAcked(ack).thenRun(() -> {
if (hasRequestId) {
- ctx.writeAndFlush(Commands.newAckResponse(
+ writeAndFlush(Commands.newAckResponse(
requestId, null, null, consumerId));
}
if (brokerInterceptor != null) {
@@ -1666,7 +1674,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
}).exceptionally(e -> {
if (hasRequestId) {
- ctx.writeAndFlush(Commands.newAckResponse(requestId,
+ writeAndFlush(Commands.newAckResponse(requestId,
BrokerServiceException.getClientErrorCode(e),
e.getMessage(), consumerId));
}
@@ -1825,7 +1833,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
CompletableFuture<Producer> producerFuture = producers.get(producerId);
if (producerFuture == null) {
log.info("[{}] Producer {} was not registered on the connection",
remoteAddress, producerId);
- ctx.writeAndFlush(Commands.newSuccess(requestId));
+ writeAndFlush(Commands.newSuccess(requestId));
return;
}
@@ -1874,7 +1882,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
CompletableFuture<Consumer> consumerFuture = consumers.get(consumerId);
if (consumerFuture == null) {
log.info("[{}] Consumer was not registered on the connection: {}",
consumerId, remoteAddress);
- ctx.writeAndFlush(Commands.newSuccess(requestId));
+ writeAndFlush(Commands.newSuccess(requestId));
return;
}
@@ -1941,7 +1949,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
consumer.getSubscription().getName());
} else {
-
ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
+ writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
ServerError.MetadataError, "Consumer not found"));
}
}
@@ -1994,7 +2002,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
handleLastMessageIdFromCompactedLedger(persistentTopic,
requestId, partitionIndex,
markDeletePosition);
} else {
- ctx.writeAndFlush(Commands.newError(
+ writeAndFlush(Commands.newError(
requestId, ServerError.MetadataError,
"Failed to get batch size for entry " +
e.getMessage()));
}
@@ -2006,7 +2014,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
topic.getName(), subscriptionName, lastPosition,
partitionIndex);
}
-
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
lastPosition.getLedgerId(),
+ writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
lastPosition.getLedgerId(),
lastPosition.getEntryId(), partitionIndex,
largestBatchIndex,
markDeletePosition != null ?
markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ?
markDeletePosition.getEntryId() : -1));
@@ -2026,12 +2034,12 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
try {
largestBatchIndex =
calculateTheLastBatchIndexInBatch(metadata, payload);
} catch (IOException ioEx){
- ctx.writeAndFlush(Commands.newError(requestId,
ServerError.MetadataError,
+ writeAndFlush(Commands.newError(requestId,
ServerError.MetadataError,
"Failed to deserialize batched message from the
last entry of the compacted Ledger: "
+ ioEx.getMessage()));
return;
}
-
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
+ writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
entry.getLedgerId(), entry.getEntryId(),
partitionIndex, largestBatchIndex,
markDeletePosition != null ?
markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ?
markDeletePosition.getEntryId() : -1));
@@ -2039,13 +2047,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
} else {
// in this case, the ledgers been removed except the current
ledger
// and current ledger without any data
-
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
+ writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
-1, -1, partitionIndex, -1,
markDeletePosition != null ?
markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ?
markDeletePosition.getEntryId() : -1));
}
}).exceptionally(ex -> {
- ctx.writeAndFlush(Commands.newError(
+ writeAndFlush(Commands.newError(
requestId, ServerError.MetadataError,
"Failed to read last entry of the compacted Ledger "
+ ex.getCause().getMessage()));
@@ -2401,12 +2409,12 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (log.isDebugEnabled()) {
log.debug("Send response success for add published
partition to txn request {}", requestId);
}
-
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
+
writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
txnID.getLeastSigBits(),
txnID.getMostSigBits()));
} else {
ex = handleTxnException(ex,
BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId);
-
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
+
writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
txnID.getLeastSigBits(),
txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex),
@@ -2468,13 +2476,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (throwable != null) {
log.error("handleEndTxnOnPartition fail!,
topic {}, txnId: [{}], "
+ "txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction), throwable);
-
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
+
writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId,
BrokerServiceException.getClientErrorCode(throwable),
throwable.getMessage(),
txnID.getLeastSigBits(),
txnID.getMostSigBits()));
return;
}
-
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
+
writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
txnID.getLeastSigBits(),
txnID.getMostSigBits()));
});
@@ -2486,7 +2494,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.error("handleEndTxnOnPartition fail ! The
topic {} does not exist in broker, "
+ "txnId: [{}], txnAction:
[{}]", topic,
txnID, TxnAction.valueOf(txnAction));
-
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
+
writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
ServerError.ServiceNotReady,
"The topic " + topic + " does not
exist in broker.",
txnID.getLeastSigBits(),
txnID.getMostSigBits()));
@@ -2494,14 +2502,14 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.warn("handleEndTxnOnPartition fail ! The
topic {} has not been created, "
+ "txnId: [{}], txnAction:
[{}]",
topic, txnID,
TxnAction.valueOf(txnAction));
-
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
+
writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
txnID.getLeastSigBits(),
txnID.getMostSigBits()));
}
}).exceptionally(e -> {
log.error("handleEndTxnOnPartition fail ! topic
{}, "
+ "txnId: [{}], txnAction: [{}]",
topic, txnID,
TxnAction.valueOf(txnAction),
e.getCause());
-
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
+
writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, ServerError.ServiceNotReady,
e.getMessage(), txnID.getLeastSigBits(),
txnID.getMostSigBits()));
return null;
@@ -2511,7 +2519,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.error("handleEndTxnOnPartition fail ! topic {}, "
+ "txnId: [{}], txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction), e.getCause());
- ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
+ writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, ServerError.ServiceNotReady,
e.getMessage(), txnID.getLeastSigBits(),
txnID.getMostSigBits()));
return null;
@@ -2543,7 +2551,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.warn("handleEndTxnOnSubscription fail! "
+ "topic {} subscription {} does not
exist. txnId: [{}], txnAction: [{}]",
optionalTopic.get().getName(), subName, txnID,
TxnAction.valueOf(txnAction));
- ctx.writeAndFlush(
+ writeAndFlush(
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits,
txnidMostBits));
return;
}
@@ -2555,13 +2563,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.error("handleEndTxnOnSubscription fail ! topic:
{}, subscription: {}"
+ "txnId: [{}], txnAction: [{}]",
topic, subName,
txnID, TxnAction.valueOf(txnAction),
e.getCause());
-
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+ writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
BrokerServiceException.getClientErrorCode(e),
"Handle end txn on subscription failed: " +
e.getMessage()));
return;
}
- ctx.writeAndFlush(
+ writeAndFlush(
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits,
txnidMostBits));
});
} else {
@@ -2572,7 +2580,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.error("handleEndTxnOnSubscription fail!
The topic {} does not exist in broker, "
+ "subscription: {}, txnId:
[{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction));
-
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+
writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnID.getLeastSigBits(),
txnID.getMostSigBits(),
ServerError.ServiceNotReady,
"The topic " + topic + " does not
exist in broker."));
@@ -2580,14 +2588,14 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.warn("handleEndTxnOnSubscription fail !
The topic {} has not been created, "
+ "subscription: {} txnId:
[{}], txnAction: [{}]",
topic, subName, txnID,
TxnAction.valueOf(txnAction));
-
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId,
+
writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId,
txnID.getLeastSigBits(),
txnID.getMostSigBits()));
}
}).exceptionally(e -> {
log.error("handleEndTxnOnSubscription fail ! topic
{}, subscription: {}"
+ "txnId: [{}], txnAction: [{}]",
topic, subName,
txnID, TxnAction.valueOf(txnAction),
e.getCause());
-
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+
writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnID.getLeastSigBits(),
txnID.getMostSigBits(),
ServerError.ServiceNotReady,
e.getMessage()));
return null;
@@ -2597,7 +2605,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.error("handleEndTxnOnSubscription fail ! topic: {},
subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
- ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+ writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
"Handle end txn on subscription failed: " +
e.getMessage()));
@@ -2652,12 +2660,12 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.debug("Send response success for add published
partition to txn request {}",
requestId);
}
-
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
+
writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
txnID.getLeastSigBits(),
txnID.getMostSigBits()));
} else {
ex = handleTxnException(ex,
BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);
-
-
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
txnID.getLeastSigBits(),
+ writeAndFlush(
+
Commands.newAddSubscriptionToTxnResponse(requestId, txnID.getLeastSigBits(),
txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
@@ -2743,7 +2751,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
// removes producer-connection from map and send close command to
producer
safelyRemoveProducer(producer);
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
-
ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
+ writeAndFlush(Commands.newCloseProducer(producer.getProducerId(),
-1L));
} else {
close();
}
@@ -2755,7 +2763,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
// removes consumer-connection from map and send close command to
consumer
safelyRemoveConsumer(consumer);
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
- ctx.writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(),
-1L));
+ writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(),
-1L));
} else {
close();
}
@@ -2998,13 +3006,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
if (requestCommand instanceof CommandLookupTopic) {
-
ctx.writeAndFlush(Commands.newLookupErrorResponse(ServerError.InvalidTopicName,
+
writeAndFlush(Commands.newLookupErrorResponse(ServerError.InvalidTopicName,
"Invalid topic name: " + t.getMessage(), requestId));
} else if (requestCommand instanceof
CommandPartitionedTopicMetadata) {
-
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.InvalidTopicName,
+
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.InvalidTopicName,
"Invalid topic name: " + t.getMessage(), requestId));
} else {
- ctx.writeAndFlush(Commands.newError(requestId,
ServerError.InvalidTopicName,
+ writeAndFlush(Commands.newError(requestId,
ServerError.InvalidTopicName,
"Invalid topic name: " + t.getMessage()));
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 72212fe16c7..496652fed0b 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -22,6 +22,7 @@ import static
com.google.common.base.Preconditions.checkArgument;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelOutboundInvoker;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
@@ -84,6 +85,7 @@ import
org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.intercept.InterceptException;
+import org.apache.pulsar.common.util.netty.NettyChannelUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,7 +133,8 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
interceptCommand(cmd);
handlePartitionMetadataRequest(cmd.getPartitionMetadata());
} catch (InterceptException e) {
-
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(getServerError(e.getErrorCode()),
+ writeAndFlush(ctx,
+
Commands.newPartitionMetadataResponse(getServerError(e.getErrorCode()),
e.getMessage(),
cmd.getPartitionMetadata().getRequestId()));
}
break;
@@ -205,7 +208,7 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
interceptCommand(cmd);
handleProducer(cmd.getProducer());
} catch (InterceptException e) {
-
ctx.writeAndFlush(Commands.newError(cmd.getProducer().getRequestId(),
+ writeAndFlush(ctx,
Commands.newError(cmd.getProducer().getRequestId(),
getServerError(e.getErrorCode()), e.getMessage()));
}
break;
@@ -218,7 +221,7 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
ByteBuf headersAndPayload = buffer.markReaderIndex();
handleSend(cmd.getSend(), headersAndPayload);
} catch (InterceptException e) {
-
ctx.writeAndFlush(Commands.newSendError(cmd.getSend().getProducerId(),
+ writeAndFlush(ctx,
Commands.newSendError(cmd.getSend().getProducerId(),
cmd.getSend().getSequenceId(),
getServerError(e.getErrorCode()), e.getMessage()));
}
break;
@@ -239,7 +242,7 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
interceptCommand(cmd);
handleSubscribe(cmd.getSubscribe());
} catch (InterceptException e) {
-
ctx.writeAndFlush(Commands.newError(cmd.getSubscribe().getRequestId(),
+ writeAndFlush(ctx,
Commands.newError(cmd.getSubscribe().getRequestId(),
getServerError(e.getErrorCode()), e.getMessage()));
}
break;
@@ -266,8 +269,13 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
interceptCommand(cmd);
handleSeek(cmd.getSeek());
} catch (InterceptException e) {
-
ctx.writeAndFlush(Commands.newError(cmd.getSeek().getRequestId(),
getServerError(e.getErrorCode()),
- e.getMessage()));
+ writeAndFlush(ctx,
+ Commands.newError(
+ cmd.getSeek().getRequestId(),
+ getServerError(e.getErrorCode()),
+ e.getMessage()
+ )
+ );
}
break;
@@ -326,7 +334,7 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
interceptCommand(cmd);
handleGetTopicsOfNamespace(cmd.getGetTopicsOfNamespace());
} catch (InterceptException e) {
-
ctx.writeAndFlush(Commands.newError(cmd.getGetTopicsOfNamespace().getRequestId(),
+ writeAndFlush(ctx,
Commands.newError(cmd.getGetTopicsOfNamespace().getRequestId(),
getServerError(e.getErrorCode()), e.getMessage()));
}
break;
@@ -342,7 +350,7 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
interceptCommand(cmd);
handleGetSchema(cmd.getGetSchema());
} catch (InterceptException e) {
-
ctx.writeAndFlush(Commands.newGetSchemaResponseError(cmd.getGetSchema().getRequestId(),
+ writeAndFlush(ctx,
Commands.newGetSchemaResponseError(cmd.getGetSchema().getRequestId(),
getServerError(e.getErrorCode()), e.getMessage()));
}
break;
@@ -358,7 +366,7 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
interceptCommand(cmd);
handleGetOrCreateSchema(cmd.getGetOrCreateSchema());
} catch (InterceptException e) {
-
ctx.writeAndFlush(Commands.newGetOrCreateSchemaResponseError(
+ writeAndFlush(ctx,
Commands.newGetOrCreateSchemaResponseError(
cmd.getGetOrCreateSchema().getRequestId(),
getServerError(e.getErrorCode()),
e.getMessage()));
}
@@ -731,4 +739,8 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
}
private static final Logger log =
LoggerFactory.getLogger(PulsarDecoder.class);
+
+ private void writeAndFlush(ChannelOutboundInvoker ctx, ByteBuf cmd) {
+ NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
+ }
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyChannelUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyChannelUtil.java
new file mode 100644
index 00000000000..c466822e205
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyChannelUtil.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOutboundInvoker;
+import io.netty.channel.VoidChannelPromise;
+
+/**
+ * Contains utility methods for working with Netty Channels.
+ */
+public final class NettyChannelUtil {
+
+ private NettyChannelUtil() {
+ }
+
+ /**
+ * Write and flush the message to the channel.
+ *
+ * The promise is an instance of {@link VoidChannelPromise} that properly
propagates exceptions up to the pipeline.
+ * Netty has many ad-hoc optimization if the promise is an instance of
{@link VoidChannelPromise}.
+ * Lastly, it reduces pollution of useless {@link
io.netty.channel.ChannelPromise} objects created
+ * by the default write and flush method {@link
ChannelOutboundInvoker#writeAndFlush(Object)}.
+ * See https://stackoverflow.com/q/54169262 and
https://stackoverflow.com/a/9030420 for more details.
+ *
+ * @param ctx channel's context
+ * @param msg buffer to write in the channel
+ */
+ public static void writeAndFlushWithVoidPromise(ChannelOutboundInvoker
ctx, ByteBuf msg) {
+ ctx.writeAndFlush(msg, ctx.voidPromise());
+ }
+
+ /**
+ * Write and flush the message to the channel and the close the channel.
+ *
+ * This method is particularly helpful when the connection is in an
invalid state
+ * and therefore a new connection must be created to continue.
+ *
+ * @param ctx channel's context
+ * @param msg buffer to write in the channel
+ */
+ public static void writeAndFlushWithClosePromise(ChannelOutboundInvoker
ctx, ByteBuf msg) {
+ ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
+ }
+
+}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/NettyChannelUtilTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/NettyChannelUtilTest.java
new file mode 100644
index 00000000000..53a5f91de1d
--- /dev/null
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/NettyChannelUtilTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.netty;
+
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelOutboundInvoker;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.VoidChannelPromise;
+import java.nio.charset.StandardCharsets;
+import org.testng.annotations.Test;
+
+public class NettyChannelUtilTest {
+
+ @Test
+ public void testWriteAndFlushWithVoidPromise() {
+ final ChannelOutboundInvoker ctx = mock(ChannelOutboundInvoker.class);
+ final VoidChannelPromise voidChannelPromise =
mock(VoidChannelPromise.class);
+ when(ctx.voidPromise()).thenReturn(voidChannelPromise);
+ final byte[] data = "test".getBytes(StandardCharsets.UTF_8);
+ final ByteBuf byteBuf = Unpooled.wrappedBuffer(data, 0, data.length);
+ try {
+ NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, byteBuf);
+ verify(ctx).writeAndFlush(same(byteBuf), same(voidChannelPromise));
+ verify(ctx).voidPromise();
+ } finally {
+ byteBuf.release();
+ }
+ }
+
+ @Test
+ public void testWriteAndFlushWithClosePromise() {
+ final ChannelOutboundInvoker ctx = mock(ChannelOutboundInvoker.class);
+ final ChannelPromise promise = mock(ChannelPromise.class);
+
+ final byte[] data = "test".getBytes(StandardCharsets.UTF_8);
+ final ByteBuf byteBuf = Unpooled.wrappedBuffer(data, 0, data.length);
+ when(ctx.writeAndFlush(same(byteBuf))).thenReturn(promise);
+ try {
+ NettyChannelUtil.writeAndFlushWithClosePromise(ctx, byteBuf);
+ verify(ctx).writeAndFlush(same(byteBuf));
+ verify(promise).addListener(same(ChannelFutureListener.CLOSE));
+ } finally {
+ byteBuf.release();
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index d51f17b4731..4b5fef3a994 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -26,7 +26,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
@@ -65,6 +64,7 @@ import
org.apache.pulsar.common.util.NettyClientSslContextRefresher;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
import
org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;
+import org.apache.pulsar.common.util.netty.NettyChannelUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -238,8 +238,8 @@ public class DirectProxyHandler {
private void writeHAProxyMessage() {
if (proxyConnection.hasHAProxyMessage()) {
-
outboundChannel.writeAndFlush(encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage()))
-
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ final ByteBuf msg =
encodeProxyProtocolMessage(proxyConnection.getHAProxyMessage());
+ writeAndFlush(msg);
} else {
if (inboundChannel.remoteAddress() instanceof InetSocketAddress
&& inboundChannel.localAddress() instanceof
InetSocketAddress) {
@@ -252,8 +252,8 @@ public class DirectProxyHandler {
HAProxyMessage msg = new
HAProxyMessage(HAProxyProtocolVersion.V1, HAProxyCommand.PROXY,
HAProxyProxiedProtocol.TCP4, sourceAddress,
destinationAddress, sourcePort,
destinationPort);
- outboundChannel.writeAndFlush(encodeProxyProtocolMessage(msg))
-
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ final ByteBuf encodedMsg = encodeProxyProtocolMessage(msg);
+ writeAndFlush(encodedMsg);
msg.release();
}
}
@@ -323,11 +323,11 @@ public class DirectProxyHandler {
// Send the Connect command to broker
authenticationDataProvider =
authentication.getAuthData(remoteHostName);
AuthData authData =
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
- ByteBuf command;
- command = Commands.newConnect(authentication.getAuthMethodName(),
authData, protocolVersion, "Pulsar proxy",
- null /* target broker */, originalPrincipal,
clientAuthData, clientAuthMethod);
- outboundChannel.writeAndFlush(command)
-
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ ByteBuf command = Commands.newConnect(
+ authentication.getAuthMethodName(), authData,
protocolVersion,
+ "Pulsar proxy", null /* target broker */,
+ originalPrincipal, clientAuthData, clientAuthMethod);
+ writeAndFlush(command);
isTlsOutboundChannel =
ProxyConnection.isTlsChannel(inboundChannel);
}
@@ -358,8 +358,7 @@ public class DirectProxyHandler {
if (msg instanceof ByteBuf) {
ProxyService.BYTES_COUNTER.inc(((ByteBuf)
msg).readableBytes());
}
- inboundChannel.writeAndFlush(msg)
-
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ inboundChannel.writeAndFlush(msg,
inboundChannel.voidPromise());
if (service.proxyZeroCopyModeEnabled && service.proxyLogLevel
== 0) {
if (!isTlsOutboundChannel &&
!DirectProxyHandler.this.proxyConnection.isTlsInboundChannel) {
@@ -412,8 +411,7 @@ public class DirectProxyHandler {
log.debug("{} Mutual auth {}", ctx.channel(),
authentication.getAuthMethodName());
}
- outboundChannel.writeAndFlush(request)
-
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ writeAndFlush(request);
} catch (Exception e) {
log.error("Error mutual verify", e);
}
@@ -495,5 +493,9 @@ public class DirectProxyHandler {
}
}
+ private void writeAndFlush(ByteBuf cmd) {
+ NettyChannelUtil.writeAndFlushWithVoidPromise(outboundChannel, cmd);
+ }
+
private static final Logger log =
LoggerFactory.getLogger(DirectProxyHandler.class);
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index 5101978c3e2..6ec597ec1cf 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.proxy.server;
import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
import io.prometheus.client.Counter;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -38,6 +39,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.util.netty.NettyChannelUtil;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,7 +115,7 @@ public class LookupProxyHandler {
log.debug("Lookup Request ID {} from {} rejected - {}.",
clientRequestId, clientAddress,
throttlingErrorMessage);
}
-
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+
writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
throttlingErrorMessage, clientRequestId));
}
@@ -122,7 +124,7 @@ public class LookupProxyHandler {
private void performLookup(long clientRequestId, String topic, String
brokerServiceUrl, boolean authoritative,
int numberOfRetries) {
if (numberOfRetries == 0) {
-
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
+
writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
"Reached max number of redirections", clientRequestId));
return;
}
@@ -131,7 +133,7 @@ public class LookupProxyHandler {
try {
brokerURI = new URI(brokerServiceUrl);
} catch (URISyntaxException e) {
- proxyConnection.ctx().writeAndFlush(
+ writeAndFlush(
Commands.newLookupErrorResponse(ServerError.MetadataError,
e.getMessage(), clientRequestId));
return;
}
@@ -150,7 +152,7 @@ public class LookupProxyHandler {
clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
if (t != null) {
log.warn("[{}] Failed to lookup topic {}: {}",
clientAddress, topic, t.getMessage());
- proxyConnection.ctx().writeAndFlush(
+ writeAndFlush(
Commands.newLookupErrorResponse(getServerError(t),
t.getMessage(), clientRequestId));
} else {
String brokerUrl = connectWithTLS ? r.brokerUrlTls :
r.brokerUrl;
@@ -170,7 +172,7 @@ public class LookupProxyHandler {
+ " with clientReq Id '{}' and
lookup-broker {}",
addr, topic, clientRequestId, brokerUrl);
}
-
proxyConnection.ctx().writeAndFlush(Commands.newLookupResponse(brokerUrl,
brokerUrl, true,
+ writeAndFlush(Commands.newLookupResponse(brokerUrl,
brokerUrl, true,
LookupType.Connect, clientRequestId, true /* this
is coming from proxy */));
}
}
@@ -178,7 +180,7 @@ public class LookupProxyHandler {
});
}).exceptionally(ex -> {
// Failed to connect to backend broker
- proxyConnection.ctx().writeAndFlush(
+ writeAndFlush(
Commands.newLookupErrorResponse(getServerError(ex),
ex.getMessage(), clientRequestId));
return null;
});
@@ -202,7 +204,7 @@ public class LookupProxyHandler {
log.debug("PartitionMetaData Request ID {} from {} rejected -
{}.", clientRequestId, clientAddress,
throttlingErrorMessage);
}
-
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
+
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
throttlingErrorMessage, clientRequestId));
}
}
@@ -239,17 +241,17 @@ public class LookupProxyHandler {
if (t != null) {
log.warn("[{}] failed to get Partitioned metadata : {}",
topicName.toString(),
t.getMessage(), t);
-
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(t),
+
writeAndFlush(Commands.newLookupErrorResponse(getServerError(t),
t.getMessage(), clientRequestId));
} else {
- proxyConnection.ctx().writeAndFlush(
+ writeAndFlush(
Commands.newPartitionMetadataResponse(r.partitions,
clientRequestId));
}
proxyConnection.getConnectionPool().releaseConnection(clientCnx);
});
}).exceptionally(ex -> {
// Failed to connect to backend broker
-
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(getServerError(ex),
+
writeAndFlush(Commands.newPartitionMetadataResponse(getServerError(ex),
ex.getMessage(), clientRequestId));
return null;
});
@@ -275,7 +277,7 @@ public class LookupProxyHandler {
log.debug("GetTopicsOfNamespace Request ID {} from {} rejected
- {}.", requestId, clientAddress,
throttlingErrorMessage);
}
- proxyConnection.ctx().writeAndFlush(Commands.newError(
+ writeAndFlush(Commands.newError(
requestId, ServerError.ServiceNotReady, throttlingErrorMessage
));
}
@@ -304,7 +306,7 @@ public class LookupProxyHandler {
String topicsHash,
CommandGetTopicsOfNamespace.Mode
mode) {
if (numberOfRetries == 0) {
-
proxyConnection.ctx().writeAndFlush(Commands.newError(clientRequestId,
ServerError.ServiceNotReady,
+ writeAndFlush(Commands.newError(clientRequestId,
ServerError.ServiceNotReady,
"Reached max number of redirections"));
return;
}
@@ -329,10 +331,10 @@ public class LookupProxyHandler {
if (t != null) {
log.warn("[{}] Failed to get TopicsOfNamespace {}: {}",
clientAddress, namespaceName, t.getMessage());
- proxyConnection.ctx().writeAndFlush(
+ writeAndFlush(
Commands.newError(clientRequestId, getServerError(t),
t.getMessage()));
} else {
- proxyConnection.ctx().writeAndFlush(
+ writeAndFlush(
Commands.newGetTopicsOfNamespaceResponse(r.getTopics(), r.getTopicsHash(),
r.isFiltered(),
r.isChanged(), clientRequestId));
}
@@ -341,7 +343,7 @@ public class LookupProxyHandler {
proxyConnection.getConnectionPool().releaseConnection(clientCnx);
}).exceptionally(ex -> {
// Failed to connect to backend broker
- proxyConnection.ctx().writeAndFlush(
+ writeAndFlush(
Commands.newError(clientRequestId, getServerError(ex),
ex.getMessage()));
return null;
});
@@ -384,10 +386,10 @@ public class LookupProxyHandler {
clientCnx.sendGetRawSchema(command, requestId).whenComplete((r, t)
-> {
if (t != null) {
log.warn("[{}] Failed to get schema {}: {}",
clientAddress, topic, t);
- proxyConnection.ctx().writeAndFlush(
+ writeAndFlush(
Commands.newError(clientRequestId, getServerError(t),
t.getMessage()));
} else {
- proxyConnection.ctx().writeAndFlush(
+ writeAndFlush(
Commands.newGetSchemaResponse(clientRequestId, r));
}
@@ -395,7 +397,7 @@ public class LookupProxyHandler {
});
}).exceptionally(ex -> {
// Failed to connect to backend broker
- proxyConnection.ctx().writeAndFlush(
+ writeAndFlush(
Commands.newError(clientRequestId, getServerError(ex),
ex.getMessage()));
return null;
});
@@ -414,7 +416,7 @@ public class LookupProxyHandler {
availableBroker = discoveryProvider.nextBroker();
} catch (Exception e) {
log.warn("[{}] Failed to get next active broker {}",
clientAddress, e.getMessage(), e);
- proxyConnection.ctx().writeAndFlush(Commands.newError(
+ writeAndFlush(Commands.newError(
clientRequestId, ServerError.ServiceNotReady,
e.getMessage()
));
return null;
@@ -427,7 +429,7 @@ public class LookupProxyHandler {
try {
brokerURI = new URI(brokerServiceUrl);
} catch (URISyntaxException e) {
- proxyConnection.ctx().writeAndFlush(
+ writeAndFlush(
Commands.newError(clientRequestId,
ServerError.MetadataError, e.getMessage()));
return null;
}
@@ -446,5 +448,10 @@ public class LookupProxyHandler {
return responseError;
}
+ private void writeAndFlush(ByteBuf cmd) {
+ final ChannelHandlerContext ctx = proxyConnection.ctx();
+ NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
+ }
+
private static final Logger log =
LoggerFactory.getLogger(LookupProxyHandler.class);
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 56fbe946069..fe4d29ac4ab 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -71,6 +71,7 @@ import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
+import org.apache.pulsar.common.util.netty.NettyChannelUtil;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -261,8 +262,8 @@ public class ProxyConnection extends PulsarHandler {
directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
ProxyService.BYTES_COUNTER.inc(bytes);
}
- directProxyHandler.outboundChannel.writeAndFlush(msg)
-
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ directProxyHandler.outboundChannel
+ .writeAndFlush(msg,
directProxyHandler.outboundChannel.voidPromise());
if (service.proxyZeroCopyModeEnabled && service.proxyLogLevel
== 0) {
if (!directProxyHandler.isTlsOutboundChannel &&
!isTlsInboundChannel) {
@@ -345,10 +346,9 @@ public class ProxyConnection extends PulsarHandler {
state = State.Closing;
LOG.warn("[{}] Target broker '{}' isn't available.
authenticated with {} role {}.",
remoteAddress, proxyToBrokerUrl, authMethod,
clientAuthRole);
- ctx()
- .writeAndFlush(
- Commands.newError(-1,
ServerError.ServiceNotReady, "Target broker isn't available."))
- .addListener(ChannelFutureListener.CLOSE);
+ final ByteBuf msg = Commands.newError(-1,
+ ServerError.ServiceNotReady, "Target broker isn't
available.");
+ writeAndFlushAndClose(msg);
return;
}
@@ -369,11 +369,9 @@ public class ProxyConnection extends PulsarHandler {
LOG.error("[{}] Error validating target broker
'{}'. authenticated with {} role {}.",
remoteAddress, proxyToBrokerUrl,
authMethod, clientAuthRole, throwable);
}
- ctx()
- .writeAndFlush(
- Commands.newError(-1,
ServerError.ServiceNotReady,
- "Target broker cannot be
validated."))
- .addListener(ChannelFutureListener.CLOSE);
+ final ByteBuf msg = Commands.newError(-1,
ServerError.ServiceNotReady,
+ "Target broker cannot be validated.");
+ writeAndFlushAndClose(msg);
return null;
});
} else {
@@ -382,8 +380,8 @@ public class ProxyConnection extends PulsarHandler {
// partitions metadata lookups
state = State.ProxyLookupRequests;
lookupProxyHandler = new LookupProxyHandler(service, this);
-
ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise, false))
-
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ final ByteBuf msg =
Commands.newConnected(protocolVersionToAdvertise, false);
+ writeAndFlush(msg);
}
}
@@ -394,9 +392,9 @@ public class ProxyConnection extends PulsarHandler {
state = State.ProxyConnectionToBroker;
int maxMessageSize =
connected.hasMaxMessageSize() ?
connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
-
ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(),
maxMessageSize,
- connected.hasFeatureFlags() &&
connected.getFeatureFlags().isSupportsTopicWatchers()))
-
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ final ByteBuf msg =
Commands.newConnected(connected.getProtocolVersion(), maxMessageSize,
+ connected.hasFeatureFlags() &&
connected.getFeatureFlags().isSupportsTopicWatchers());
+ writeAndFlush(msg);
} else {
LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+ "Closing connection to broker '{}'.",
@@ -445,8 +443,8 @@ public class ProxyConnection extends PulsarHandler {
}
// auth not complete, continue auth with client side.
- ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData,
protocolVersionToAdvertise))
- .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ final ByteBuf msg = Commands.newAuthChallenge(authMethod, brokerData,
protocolVersionToAdvertise);
+ writeAndFlush(msg);
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
@@ -524,8 +522,8 @@ public class ProxyConnection extends PulsarHandler {
doAuthentication(clientData);
} catch (Exception e) {
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
- ctx.writeAndFlush(Commands.newError(-1,
ServerError.AuthenticationError, "Failed to authenticate"))
- .addListener(ChannelFutureListener.CLOSE);
+ final ByteBuf msg = Commands.newError(-1,
ServerError.AuthenticationError, "Failed to authenticate");
+ writeAndFlushAndClose(msg);
}
}
@@ -589,10 +587,10 @@ public class ProxyConnection extends PulsarHandler {
});
}
} catch (Exception e) {
- String msg = "Unable to handleAuthResponse";
- LOG.warn("[{}] {} ", remoteAddress, msg, e);
- ctx.writeAndFlush(Commands.newError(-1,
ServerError.AuthenticationError, msg))
- .addListener(ChannelFutureListener.CLOSE);
+ String errorMsg = "Unable to handleAuthResponse";
+ LOG.warn("[{}] {} ", remoteAddress, errorMsg, e);
+ final ByteBuf msg = Commands.newError(-1,
ServerError.AuthenticationError, errorMsg);
+ writeAndFlushAndClose(msg);
}
}
@@ -727,4 +725,12 @@ public class ProxyConnection extends PulsarHandler {
&& pulsarServiceUrl.startsWith(expectedPrefix)
&& pulsarServiceUrl.startsWith(brokerHostPort,
expectedPrefix.length());
}
+
+ private void writeAndFlush(ByteBuf cmd) {
+ NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
+ }
+
+ private void writeAndFlushAndClose(ByteBuf cmd) {
+ NettyChannelUtil.writeAndFlushWithClosePromise(ctx, cmd);
+ }
}