This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 164add6236e [improve][proxy] Remove unnecessary executor callback; use
assert (#19670)
164add6236e is described below
commit 164add6236eccb34f16683e86904f8d8d0b90c54
Author: Michael Marshall <[email protected]>
AuthorDate: Mon Mar 20 13:55:02 2023 -0500
[improve][proxy] Remove unnecessary executor callback; use assert (#19670)
### Motivation
The `DirectProxyHandler` and the `ProxyConnection` are run in the same
event loop to prevent context switching. As such, we do not need to schedule an
event onto an event loop that is in fact the same event loop. Further,
scheduling on that event loop could have resulted in uncaught failures because
the method was run without any error handling.
Additionally, we can use `assert` to verify that we are in the event loop.
Netty makes extensive use of this paradigm, as described in this PR #19653. The
primary benefit here is that we skip some unnecessary volatile reads when
running the code in production.
### Modifications
* Replace `checkState` with `assert` in `ProxyConnection` class
* Remove unnecessary event execution in callback when starting up the
`DirectProxyHandler`.
### Verifying this change
This change is covered by the assertions that are added.
### Does this pull request potentially affect one of the following parts:
This is a minor improvement that should not break anything.
### Documentation
- [x] `doc-not-needed`
### Matching PR in forked repository
PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/33
---
.../java/org/apache/pulsar/proxy/server/ProxyConnection.java | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 5a53f6ec014..1e19d760c6d 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
@@ -380,7 +379,7 @@ public class ProxyConnection extends PulsarHandler {
}
private void handleBrokerConnected(DirectProxyHandler directProxyHandler,
CommandConnected connected) {
- checkState(ctx.executor().inEventLoop(), "This method should be called
in the event loop");
+ assert ctx.executor().inEventLoop();
if (state == State.ProxyConnectingToBroker && ctx.channel().isOpen()
&& this.directProxyHandler == null) {
this.directProxyHandler = directProxyHandler;
state = State.ProxyConnectionToBroker;
@@ -401,7 +400,7 @@ public class ProxyConnection extends PulsarHandler {
}
private void connectToBroker(InetSocketAddress brokerAddress) {
- checkState(ctx.executor().inEventLoop(), "This method should be called
in the event loop");
+ assert ctx.executor().inEventLoop();
DirectProxyHandler directProxyHandler = new
DirectProxyHandler(service, this);
directProxyHandler.connect(proxyToBrokerUrl, brokerAddress,
protocolVersionToAdvertise);
}
@@ -409,10 +408,13 @@ public class ProxyConnection extends PulsarHandler {
public void brokerConnected(DirectProxyHandler directProxyHandler,
CommandConnected connected) {
try {
final CommandConnected finalConnected = new
CommandConnected().copyFrom(connected);
- ctx.executor().execute(() ->
handleBrokerConnected(directProxyHandler, finalConnected));
+ handleBrokerConnected(directProxyHandler, finalConnected);
} catch (RejectedExecutionException e) {
LOG.error("Event loop was already closed. Closing broker
connection.", e);
directProxyHandler.close();
+ } catch (AssertionError e) {
+ LOG.error("Failed assertion, closing direct proxy handler.", e);
+ directProxyHandler.close();
}
}