lhotari commented on code in PR #23123:
URL: https://github.com/apache/pulsar/pull/23123#discussion_r1703589394


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -3605,19 +3605,54 @@ public String clientSourceAddressAndPort() {
 
     @Override
     public CompletableFuture<Optional<Boolean>> checkConnectionLiveness() {
+        if (!isActive()) {
+            return CompletableFuture.completedFuture(Optional.of(false));
+        }
         if (connectionLivenessCheckTimeoutMillis > 0) {
             return 
NettyFutureUtil.toCompletableFuture(ctx.executor().submit(() -> {
+                if (!isActive()) {
+                    return 
CompletableFuture.completedFuture(Optional.of(false));
+                }
                 if (connectionCheckInProgress != null) {
                     return connectionCheckInProgress;
                 } else {
                     final CompletableFuture<Optional<Boolean>> 
finalConnectionCheckInProgress =
                             new CompletableFuture<>();
                     connectionCheckInProgress = finalConnectionCheckInProgress;
                     ctx.executor().schedule(() -> {
-                        if (finalConnectionCheckInProgress == 
connectionCheckInProgress
-                                && !finalConnectionCheckInProgress.isDone()) {
+                        if (!isActive()) {
+                            
finalConnectionCheckInProgress.complete(Optional.of(false));
+                            return;
+                        }
+                        if (finalConnectionCheckInProgress.isDone()) {
+                            return;
+                        }
+                        if (finalConnectionCheckInProgress == 
connectionCheckInProgress) {
+                            /**
+                             * {@link #connectionCheckInProgress} will be 
completed when
+                             * {@link #channelInactive(ChannelHandlerContext)} 
event occurs, so skip set it here.
+                             */
                             log.warn("[{}] Connection check timed out. Closing 
connection.", this.toString());
                             ctx.close();
+                        } else {
+                            /**
+                             * Scenarios that changing {@link 
#connectionCheckInProgress}.
+                             *   1. {@link #connectionCheckInProgress} will be 
changed to "null" after a successful
+                             *     "Ping & Pong"
+                             *   2. {@link #connectionCheckInProgress} will be 
set to a new future the next time
+                             *     {@link #checkConnectionLiveness()} when it 
is null.
+                             *
+                             * Once {@link #connectionCheckInProgress} is not 
equal to
+                             *   {@link #finalConnectionCheckInProgress}, it 
means Scenario 1 occurred before. If
+                             *   "receiving Pong" and "the current scheduled 
task" are executing at the same time,
+                             *   this log will be printed. Since the two 
events will be executing at the same thread,
+                             *   the concurrency scenario can not occur, so 
this log's level is "ERROR".
+                             */
+                            log.error("[{}] Connection check might be success, 
because the variable"
+                                    + " connectionCheckInProgress has been 
override by the following check. But this"
+                                    + " scenario is not expected",
+                                    this.toString());
+                            
finalConnectionCheckInProgress.complete(Optional.of(true));

Review Comment:
   Why would this be needed? 
   There's an intention to modify `connectionCheckInProgress` only in the event 
loop of `ServerCnx`, which should make it atomic. Do you see a case where that 
wouldn't be satisfied?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to