michaeljmarshall commented on code in PR #15366:
URL: https://github.com/apache/pulsar/pull/15366#discussion_r861109478
##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -313,6 +334,43 @@ private synchronized void completeConnect(AuthData
clientData) throws PulsarClie
}
}
+ private void handleBrokerConnected(DirectProxyHandler directProxyHandler,
CommandConnected connected) {
+ checkState(ctx.executor().inEventLoop(), "This method should be called
in the event loop");
+ if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen()
&& this.directProxyHandler == null) {
+ this.directProxyHandler = directProxyHandler;
+ state = State.ProxyConnectionToBroker;
+ int maxMessageSize =
+ connected.hasMaxMessageSize() ?
connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
+
ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(),
maxMessageSize))
+
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ } else {
+ LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+ + "Closing connection to broker '{}'.",
+ remoteAddress, ctx.channel().isOpen() ? "open" : "already
closed",
+ state != State.ProxyConnectingToBroker ? "invalid state "
+ state : "state " + state,
+ proxyToBrokerUrl);
+ directProxyHandler.close();
Review Comment:
What is the purpose of closing this here? I'd expect the `channelInactive`
method to close it. Is it an optimization to close the channel as early as
possible? If so, should we set it to `null`?
##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##########
@@ -216,18 +234,26 @@ public void channelRead(final ChannelHandlerContext ctx,
Object msg) throws Exce
break;
case ProxyConnectionToBroker:
- // Pass the buffer to the outbound connection and schedule next
read
- // only if we can write on the connection
- ProxyService.OPS_COUNTER.inc();
- if (msg instanceof ByteBuf) {
- int bytes = ((ByteBuf) msg).readableBytes();
-
directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
- ProxyService.BYTES_COUNTER.inc(bytes);
+ if (directProxyHandler != null) {
+ ProxyService.OPS_COUNTER.inc();
+ if (msg instanceof ByteBuf) {
+ int bytes = ((ByteBuf) msg).readableBytes();
+
directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
+ ProxyService.BYTES_COUNTER.inc(bytes);
+ }
+ directProxyHandler.outboundChannel.writeAndFlush(msg)
+
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ } else {
+ LOG.warn("Received message of type {} while connection to
broker is missing in state {}. "
+ + "Dropping the input message (readable
bytes={}).", msg.getClass(), state,
+ msg instanceof ByteBuf ? ((ByteBuf)
msg).readableBytes() : -1);
}
- directProxyHandler.outboundChannel.writeAndFlush(msg)
-
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
break;
-
+ case ProxyConnectingToBroker:
+ LOG.warn("Received message of type {} while connecting to broker. "
+ + "Dropping the input message (readable
bytes={}).", msg.getClass(),
+ msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() :
-1);
Review Comment:
Depending on the type of protocol message, dropping it could be problematic.
For example, if the protocol message is a delivered message, it could lead to
out of order persistence, as we saw in the broker.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]