[ 
https://issues.apache.org/jira/browse/ARTEMIS-4476?focusedWorklogId=889519&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-889519
 ]

ASF GitHub Bot logged work on ARTEMIS-4476:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Nov/23 14:46
            Start Date: 08/Nov/23 14:46
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #4656:
URL: https://github.com/apache/activemq-artemis/pull/4656#discussion_r1386730159


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java:
##########
@@ -73,17 +78,54 @@ public void fail(final ActiveMQException me, String 
scaleDownTargetNodeID) {
 
       destroyed = true;
 
-      //filter it like the other protocols
-      if (!(me instanceof ActiveMQRemoteDisconnectException)) {
-         
ActiveMQClientLogger.LOGGER.connectionFailureDetected(amqpConnection.getConnectionCallback().getTransportConnection().getRemoteAddress(),
 me.getMessage(), me.getType());
+      if (logger.isInfoEnabled()) {
+         try {
+            logger.debug("Connection failure detected. 
amqpConnection.getHandler().getConnection().getRemoteState() = {}, 
remoteIP={}", amqpConnection.getHandler().getConnection().getRemoteState(), 
amqpConnection.getConnectionCallback().getTransportConnection().getRemoteAddress());

Review Comment:
   gate doesnt match the actual logging



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java:
##########
@@ -73,17 +78,54 @@ public void fail(final ActiveMQException me, String 
scaleDownTargetNodeID) {
 
       destroyed = true;
 
-      //filter it like the other protocols
-      if (!(me instanceof ActiveMQRemoteDisconnectException)) {
-         
ActiveMQClientLogger.LOGGER.connectionFailureDetected(amqpConnection.getConnectionCallback().getTransportConnection().getRemoteAddress(),
 me.getMessage(), me.getType());
+      if (logger.isInfoEnabled()) {
+         try {
+            logger.debug("Connection failure detected. 
amqpConnection.getHandler().getConnection().getRemoteState() = {}, 
remoteIP={}", amqpConnection.getHandler().getConnection().getRemoteState(), 
amqpConnection.getConnectionCallback().getTransportConnection().getRemoteAddress());
+         } catch (Throwable e) { // just to avoid a possible NPE from the 
debug statement itself
+            logger.debug(e.getMessage(), e);
+            logger.warn(e.getMessage(), e);
+         }
       }
 
-      // Then call the listeners
-      callFailureListeners(me, scaleDownTargetNodeID);
+      try {
+         if (amqpConnection.getHandler().getConnection().getRemoteState() != 
EndpointState.CLOSED) {
+            // A remote close was received on the client, on that case it's 
just a normal operation and we don't need to log this.
+            
ActiveMQClientLogger.LOGGER.connectionFailureDetected(amqpConnection.getConnectionCallback().getTransportConnection().getRemoteAddress(),
 me.getMessage(), me.getType());
+         }
+      } catch (Throwable e) { // avoiding NPEs from te logging statement. I 
don't think this would happen, but just in case
+         logger.warn(e.getMessage(), e);
+      }
 
-      callClosingListeners();
+      amqpConnection.runNow(() -> {
+         // Then call the listeners
+         callFailureListeners(me, scaleDownTargetNodeID);
 
-      internalClose();
+         callClosingListeners();
+
+         internalClose();
+      });
+   }
+
+   @Override
+   public void close() {
+      if (destroyed) {
+         return;
+      }
+
+      destroyed = true;
+
+      if (logger.isInfoEnabled()) {
+         try {
+            logger.debug("Connection regular close. 
amqpConnection.getHandler().getConnection().getRemoteState() = {}, 
remoteIP={}", amqpConnection.getHandler().getConnection().getRemoteState(), 
amqpConnection.getConnectionCallback().getTransportConnection().getRemoteAddress());
+         } catch (Throwable e) { // just to avoid a possible NPE from the 
debug statement itself

Review Comment:
   gate doesnt match the actual logging



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/ExecutorNettyAdapter.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.remoting.impl;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.ProgressivePromise;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+
+/** Test cases may supply a simple executor instead of the real Netty Executor
+ *  On that case this is a simple adapter for what's needed from these tests.
+ *  Not intended to be used in production.
+ *
+ *  TODO: This could be refactored out of the main codebase but at a high cost.
+ *        We may do it some day if we find an easy way that won't clutter the 
code too much.
+ *  */

Review Comment:
   This is in the main code, shouldnt be if only used for tests? I feel like we 
deleted something like this recently.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 889519)
    Time Spent: 50m  (was: 40m)

> Connection Failure Race Conditions in AMQP and Core
> ---------------------------------------------------
>
>                 Key: ARTEMIS-4476
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4476
>             Project: ActiveMQ Artemis
>          Issue Type: Task
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Failure Detection has a possibility to a race condition with the processing 
> of the client packets (or frames in the case of AMQP).
> This is because Netty detects the failure and removes the connection objects 
> while the packets are still processing things. 
> I was not able to reproduce this particular issue, but I have seen a case 
> from a memory dump where the consumer was created while the connection was 
> already dropped, leaving the consumer isolated without any communication with 
> clients.
> That particular case I could see a possibility because of these races.
> I am adding tests to exercise connection failure in stress and I was able to 
> reproduce other issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to