This is an automated email from the ASF dual-hosted git repository.

vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new 6685c0c347 QPID-8700 - [Broker-J] 
NonBlockingConnection#shutdownFinalWrite() may loop infinitely (#299)
6685c0c347 is described below

commit 6685c0c347093b025adb074327752826f149c437
Author: Daniil Kirilyuk <daniel.kiril...@gmail.com>
AuthorDate: Tue Jul 29 10:06:53 2025 +0200

    QPID-8700 - [Broker-J] NonBlockingConnection#shutdownFinalWrite() may loop 
infinitely (#299)
---
 .../apache/qpid/server/model/port/AmqpPort.java    |  12 +
 .../server/transport/NonBlockingConnection.java    |  15 +-
 .../transport/NonBlockingConnectionTest.java       | 308 +++++++++++++++++++++
 .../server/transport/TCPandSSLTransportTest.java   |   2 +
 4 files changed, 335 insertions(+), 2 deletions(-)

diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
index b51f90d710..20afa83d11 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
@@ -159,6 +159,18 @@ public interface AmqpPort<X extends AmqpPort<X>> extends 
Port<X>
             description = "The connection property enrichers to apply to 
connections created on this port.")
     String DEFAULT_CONNECTION_PROTOCOL_ENRICHERS = "[ \"STANDARD\" ] ";
 
+    String FINAL_WRITE_THRESHOLD = "qpid.port.final_write_threshold";
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = FINAL_WRITE_THRESHOLD, description = 
"Threshold to check for final write timeout.")
+    int DEFAULT_FINAL_WRITE_THRESHOLD = 100;
+
+    String FINAL_WRITE_TIMEOUT = "qpid.port.final_write_timeout";
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = FINAL_WRITE_TIMEOUT,
+            description = "Maximum time allowed for a connection to be 
closed." +
+            " If the connection does not close this time, it will be aborted.")
+    long DEFAULT_FINAL_WRITE_TIMEOUT = 1000L;
+
     @ManagedAttribute( defaultValue = AmqpPort.DEFAULT_AMQP_TCP_NO_DELAY )
     boolean isTcpNoDelay();
 
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
 
b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
index fdc99dd0eb..aab4806a2c 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
@@ -58,6 +58,8 @@ public class NonBlockingConnection implements 
ServerNetworkConnection, ByteBuffe
     private final AtomicBoolean _closed = new AtomicBoolean(false);
     private final ProtocolEngine _protocolEngine;
     private final Runnable _onTransportEncryptionAction;
+    private final int _finalWriteThreshold;
+    private final long _finalWriteTimeout;
 
     private volatile boolean _fullyWritten = true;
 
@@ -110,7 +112,8 @@ public class NonBlockingConnection implements 
ServerNetworkConnection, ByteBuffe
         {
             _delegate = new NonBlockingConnectionUndecidedDelegate(this);
         }
-
+        _finalWriteThreshold = port.getContextValue(Integer.class, 
AmqpPort.FINAL_WRITE_THRESHOLD);
+        _finalWriteTimeout = port.getContextValue(Long.class, 
AmqpPort.FINAL_WRITE_TIMEOUT);
     }
 
     String getThreadName()
@@ -414,8 +417,16 @@ public class NonBlockingConnection implements 
ServerNetworkConnection, ByteBuffe
     {
         try
         {
-            while(!doWrite())
+            final long startTime = System.currentTimeMillis();
+            int cnt = 0;
+            while (!doWrite())
             {
+                if (cnt % _finalWriteThreshold == 0 && 
System.currentTimeMillis() - startTime > _finalWriteTimeout)
+                {
+                    final long executionTime = System.currentTimeMillis() - 
startTime;
+                    throw new IOException("Failed to perform final write to 
connection after " + executionTime + " ms timeout");
+                }
+                cnt++;
             }
         }
         catch (IOException e)
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/transport/NonBlockingConnectionTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/transport/NonBlockingConnectionTest.java
new file mode 100644
index 0000000000..b18416a8fd
--- /dev/null
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/transport/NonBlockingConnectionTest.java
@@ -0,0 +1,308 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.transport;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.Appender;
+import ch.qos.logback.core.Context;
+import ch.qos.logback.core.LogbackException;
+import ch.qos.logback.core.filter.Filter;
+import ch.qos.logback.core.spi.FilterReply;
+import ch.qos.logback.core.status.Status;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.Broker;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.transport.network.TransportEncryption;
+
+class NonBlockingConnectionTest
+{
+    private static final TestAppender appender = new 
TestAppender(NonBlockingConnection.class);
+    private static final ch.qos.logback.classic.Logger ROOT_LOGGER =
+            (ch.qos.logback.classic.Logger) 
LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
+
+    private final SelectorThread.SelectionTask selectionTask = 
mock(SelectorThread.SelectionTask.class);
+
+    private NonBlockingConnection _nonBlockingConnection;
+
+    @BeforeAll
+    static void beforeAll()
+    {
+        ROOT_LOGGER.addAppender(appender);
+    }
+
+    @AfterAll
+    static void afterAll()
+    {
+        ROOT_LOGGER.detachAppender(appender.getName());
+    }
+
+    @BeforeEach
+    void beforeEach()
+    {
+        final SocketAddress localAddress = mock(SocketAddress.class);
+        when(localAddress.toString()).thenReturn("127.0.0.1:5672");
+        final Socket socket = mock(Socket.class);
+        when(socket.getRemoteSocketAddress()).thenReturn(new 
InetSocketAddress("localhost", 1000));
+        when(socket.getLocalSocketAddress()).thenReturn(localAddress);
+        final SocketChannel socketChannel = mock(SocketChannel.class);
+        when(socketChannel.socket()).thenReturn(socket);
+        final ProtocolEngine protocolEngine = mock(ProtocolEngine.class);
+        final NetworkConnectionScheduler scheduler = 
mock(NetworkConnectionScheduler.class);
+        final AmqpPort<?> port = mock(AmqpPort.class);
+        final EventLogger eventLogger = mock(EventLogger.class);
+        final Broker broker = mock(Broker.class);
+        when(broker.getEventLogger()).thenReturn(eventLogger);
+        when(port.getContextValue(Integer.class, 
AmqpPort.FINAL_WRITE_THRESHOLD)).thenReturn(100);
+        when(port.getContextValue(Long.class, 
AmqpPort.FINAL_WRITE_TIMEOUT)).thenReturn(100L);
+        when(port.getParent()).thenReturn(broker);
+        final Set<TransportEncryption> encryptionSet = 
Set.of(TransportEncryption.NONE);
+
+        _nonBlockingConnection =
+                new NonBlockingConnection(socketChannel, protocolEngine, 
encryptionSet, () -> {}, scheduler, port);
+
+        appender.clear();
+    }
+
+    /** Delegate always returns WriteResult containing complete = false, 
causing an infinite loop in
+     * NonBlockingConnection#shutdownFinalWrite(), which should be handled by 
the timeout handling */
+    @Test
+    void shutdownFinalWriteLooping() throws Exception
+    {
+        // construct delegate mock returning WriteResult[complete=false]
+        final NonBlockingConnectionPlainDelegate delegate = 
mock(NonBlockingConnectionPlainDelegate.class);
+        when(delegate.doWrite(any())).thenReturn(new 
NonBlockingConnectionDelegate.WriteResult(false, 0L));
+        injectDelegate(delegate);
+
+        // close the connection
+        _nonBlockingConnection.setSelectionTask(selectionTask);
+        _nonBlockingConnection.close();
+        _nonBlockingConnection.doWork();
+
+        // there should be only 2 log messages
+        assertEquals(2, appender.getEvents().size());
+
+        // first log message states that connection will be closed
+        final ILoggingEvent firstLogEntry = appender.getEvents().get(0);
+        assertEquals(Level.DEBUG, firstLogEntry.getLevel());
+        assertEquals("Closing localhost/127.0.0.1:1000", 
firstLogEntry.getMessage());
+
+        // second log message informs about timeout which happened during 
shutdownFinalWrite()
+        final ILoggingEvent secondLogEntry = appender.getEvents().get(1);
+        assertEquals(Level.INFO, secondLogEntry.getLevel());
+        assertEquals("Exception performing final write/close for '{}': {}", 
secondLogEntry.getMessage());
+        assertEquals("localhost/127.0.0.1:1000", 
secondLogEntry.getArgumentArray()[0]);
+        
assertTrue(String.valueOf(secondLogEntry.getArgumentArray()[1]).startsWith("Failed
 to perform final write to connection after"));
+    }
+
+    /** Delegate immediately returns WriteResult containing complete = true, 
no timeout handling involved */
+    @Test
+    void shutdownFinalWriteWithoutLooping() throws Exception
+    {
+        // construct delegate mock returning WriteResult[complete=true]
+        final NonBlockingConnectionPlainDelegate delegate = 
mock(NonBlockingConnectionPlainDelegate.class);
+        when(delegate.doWrite(any())).thenReturn(new 
NonBlockingConnectionDelegate.WriteResult(true, 0L));
+        injectDelegate(delegate);
+
+        // close the connection
+        _nonBlockingConnection.setSelectionTask(selectionTask);
+        _nonBlockingConnection.close();
+        _nonBlockingConnection.doWork();
+
+        // there should be only 1 log message (no timeout)
+        assertEquals(1, appender.getEvents().size());
+
+        // first log message states that connection will be closed
+        final ILoggingEvent firstLogEntry = appender.getEvents().get(0);
+        assertEquals(Level.DEBUG, firstLogEntry.getLevel());
+        assertEquals("Closing localhost/127.0.0.1:1000", 
firstLogEntry.getMessage());
+    }
+
+    /** Inject delegate using reflection */
+    private void injectDelegate(final NonBlockingConnectionPlainDelegate 
delegate) throws Exception
+    {
+        final Field delegateField = 
NonBlockingConnection.class.getDeclaredField("_delegate");
+        delegateField.setAccessible(true);
+        delegateField.set(_nonBlockingConnection, delegate);
+    }
+
+    /** Logging Appender string list of logging events */
+    static class TestAppender implements Appender<ILoggingEvent>
+    {
+        private final String className;
+        private final List<ILoggingEvent> _events = new ArrayList<>();
+
+        TestAppender(final Class<?> clazz)
+        {
+            className = clazz.getCanonicalName();
+        }
+
+        @Override
+        public String getName()
+        {
+            return getClass().getSimpleName();
+        }
+
+        @Override
+        public void doAppend(final ILoggingEvent iLoggingEvent) throws 
LogbackException
+        {
+            if (className.equals(iLoggingEvent.getLoggerName()))
+            {
+                _events.add(iLoggingEvent);
+            }
+        }
+
+        @Override
+        public void setName(final String s)
+        {
+
+        }
+
+        @Override
+        public void setContext(final Context context)
+        {
+
+        }
+
+        @Override
+        public Context getContext()
+        {
+            return null;
+        }
+
+        @Override
+        public void addStatus(final Status status)
+        {
+
+        }
+
+        @Override
+        public void addInfo(final String s)
+        {
+
+        }
+
+        @Override
+        public void addInfo(final String s, final Throwable throwable)
+        {
+
+        }
+
+        @Override
+        public void addWarn(final String s)
+        {
+
+        }
+
+        @Override
+        public void addWarn(final String s, final Throwable throwable)
+        {
+
+        }
+
+        @Override
+        public void addError(final String s)
+        {
+
+        }
+
+        @Override
+        public void addError(final String s, final Throwable throwable)
+        {
+
+        }
+
+        @Override
+        public void addFilter(final Filter<ILoggingEvent> filter)
+        {
+
+        }
+
+        @Override
+        public void clearAllFilters()
+        {
+
+        }
+
+        @Override
+        public List<Filter<ILoggingEvent>> getCopyOfAttachedFiltersList()
+        {
+            return List.of();
+        }
+
+        @Override
+        public FilterReply getFilterChainDecision(final ILoggingEvent 
iLoggingEvent)
+        {
+            return null;
+        }
+
+        @Override
+        public void start()
+        {
+
+        }
+
+        @Override
+        public void stop()
+        {
+
+        }
+
+        @Override
+        public boolean isStarted()
+        {
+            return true;
+        }
+
+        public List<ILoggingEvent> getEvents()
+        {
+            return _events;
+        }
+
+        public void clear()
+        {
+            _events.clear();
+        }
+    }
+}
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
index 510177a91c..435607210a 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
@@ -213,6 +213,8 @@ public class TCPandSSLTransportTest extends UnitTestBase
         when(port.getContextValue(Boolean.class, 
AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING)).thenReturn(false);
         when(port.getContextValue(Integer.class, 
AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD)).thenReturn(1000);
         when(port.getContextValue(Integer.class, 
AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD)).thenReturn(1005);
+        when(port.getContextValue(Integer.class, 
AmqpPort.FINAL_WRITE_THRESHOLD)).thenReturn(100);
+        when(port.getContextValue(Long.class, 
AmqpPort.FINAL_WRITE_TIMEOUT)).thenReturn(100L);
         final ObjectMapper mapper = new ObjectMapper();
         final JavaType type = 
mapper.getTypeFactory().constructCollectionType(List.class, String.class);
         final List<String> allowList = 
mapper.readValue(Broker.DEFAULT_SECURITY_TLS_PROTOCOL_ALLOW_LIST, type);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to