This is an automated email from the ASF dual-hosted git repository. vavrtom pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push: new 6685c0c347 QPID-8700 - [Broker-J] NonBlockingConnection#shutdownFinalWrite() may loop infinitely (#299) 6685c0c347 is described below commit 6685c0c347093b025adb074327752826f149c437 Author: Daniil Kirilyuk <daniel.kiril...@gmail.com> AuthorDate: Tue Jul 29 10:06:53 2025 +0200 QPID-8700 - [Broker-J] NonBlockingConnection#shutdownFinalWrite() may loop infinitely (#299) --- .../apache/qpid/server/model/port/AmqpPort.java | 12 + .../server/transport/NonBlockingConnection.java | 15 +- .../transport/NonBlockingConnectionTest.java | 308 +++++++++++++++++++++ .../server/transport/TCPandSSLTransportTest.java | 2 + 4 files changed, 335 insertions(+), 2 deletions(-) diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java index b51f90d710..20afa83d11 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java @@ -159,6 +159,18 @@ public interface AmqpPort<X extends AmqpPort<X>> extends Port<X> description = "The connection property enrichers to apply to connections created on this port.") String DEFAULT_CONNECTION_PROTOCOL_ENRICHERS = "[ \"STANDARD\" ] "; + String FINAL_WRITE_THRESHOLD = "qpid.port.final_write_threshold"; + @SuppressWarnings("unused") + @ManagedContextDefault(name = FINAL_WRITE_THRESHOLD, description = "Threshold to check for final write timeout.") + int DEFAULT_FINAL_WRITE_THRESHOLD = 100; + + String FINAL_WRITE_TIMEOUT = "qpid.port.final_write_timeout"; + @SuppressWarnings("unused") + @ManagedContextDefault(name = FINAL_WRITE_TIMEOUT, + description = "Maximum time allowed for a connection to be closed." + + " If the connection does not close this time, it will be aborted.") + long DEFAULT_FINAL_WRITE_TIMEOUT = 1000L; + @ManagedAttribute( defaultValue = AmqpPort.DEFAULT_AMQP_TCP_NO_DELAY ) boolean isTcpNoDelay(); diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java index fdc99dd0eb..aab4806a2c 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java +++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java @@ -58,6 +58,8 @@ public class NonBlockingConnection implements ServerNetworkConnection, ByteBuffe private final AtomicBoolean _closed = new AtomicBoolean(false); private final ProtocolEngine _protocolEngine; private final Runnable _onTransportEncryptionAction; + private final int _finalWriteThreshold; + private final long _finalWriteTimeout; private volatile boolean _fullyWritten = true; @@ -110,7 +112,8 @@ public class NonBlockingConnection implements ServerNetworkConnection, ByteBuffe { _delegate = new NonBlockingConnectionUndecidedDelegate(this); } - + _finalWriteThreshold = port.getContextValue(Integer.class, AmqpPort.FINAL_WRITE_THRESHOLD); + _finalWriteTimeout = port.getContextValue(Long.class, AmqpPort.FINAL_WRITE_TIMEOUT); } String getThreadName() @@ -414,8 +417,16 @@ public class NonBlockingConnection implements ServerNetworkConnection, ByteBuffe { try { - while(!doWrite()) + final long startTime = System.currentTimeMillis(); + int cnt = 0; + while (!doWrite()) { + if (cnt % _finalWriteThreshold == 0 && System.currentTimeMillis() - startTime > _finalWriteTimeout) + { + final long executionTime = System.currentTimeMillis() - startTime; + throw new IOException("Failed to perform final write to connection after " + executionTime + " ms timeout"); + } + cnt++; } } catch (IOException e) diff --git a/broker-core/src/test/java/org/apache/qpid/server/transport/NonBlockingConnectionTest.java b/broker-core/src/test/java/org/apache/qpid/server/transport/NonBlockingConnectionTest.java new file mode 100644 index 0000000000..b18416a8fd --- /dev/null +++ b/broker-core/src/test/java/org/apache/qpid/server/transport/NonBlockingConnectionTest.java @@ -0,0 +1,308 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.transport; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.Appender; +import ch.qos.logback.core.Context; +import ch.qos.logback.core.LogbackException; +import ch.qos.logback.core.filter.Filter; +import ch.qos.logback.core.spi.FilterReply; +import ch.qos.logback.core.status.Status; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.model.Broker; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.transport.network.TransportEncryption; + +class NonBlockingConnectionTest +{ + private static final TestAppender appender = new TestAppender(NonBlockingConnection.class); + private static final ch.qos.logback.classic.Logger ROOT_LOGGER = + (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); + + private final SelectorThread.SelectionTask selectionTask = mock(SelectorThread.SelectionTask.class); + + private NonBlockingConnection _nonBlockingConnection; + + @BeforeAll + static void beforeAll() + { + ROOT_LOGGER.addAppender(appender); + } + + @AfterAll + static void afterAll() + { + ROOT_LOGGER.detachAppender(appender.getName()); + } + + @BeforeEach + void beforeEach() + { + final SocketAddress localAddress = mock(SocketAddress.class); + when(localAddress.toString()).thenReturn("127.0.0.1:5672"); + final Socket socket = mock(Socket.class); + when(socket.getRemoteSocketAddress()).thenReturn(new InetSocketAddress("localhost", 1000)); + when(socket.getLocalSocketAddress()).thenReturn(localAddress); + final SocketChannel socketChannel = mock(SocketChannel.class); + when(socketChannel.socket()).thenReturn(socket); + final ProtocolEngine protocolEngine = mock(ProtocolEngine.class); + final NetworkConnectionScheduler scheduler = mock(NetworkConnectionScheduler.class); + final AmqpPort<?> port = mock(AmqpPort.class); + final EventLogger eventLogger = mock(EventLogger.class); + final Broker broker = mock(Broker.class); + when(broker.getEventLogger()).thenReturn(eventLogger); + when(port.getContextValue(Integer.class, AmqpPort.FINAL_WRITE_THRESHOLD)).thenReturn(100); + when(port.getContextValue(Long.class, AmqpPort.FINAL_WRITE_TIMEOUT)).thenReturn(100L); + when(port.getParent()).thenReturn(broker); + final Set<TransportEncryption> encryptionSet = Set.of(TransportEncryption.NONE); + + _nonBlockingConnection = + new NonBlockingConnection(socketChannel, protocolEngine, encryptionSet, () -> {}, scheduler, port); + + appender.clear(); + } + + /** Delegate always returns WriteResult containing complete = false, causing an infinite loop in + * NonBlockingConnection#shutdownFinalWrite(), which should be handled by the timeout handling */ + @Test + void shutdownFinalWriteLooping() throws Exception + { + // construct delegate mock returning WriteResult[complete=false] + final NonBlockingConnectionPlainDelegate delegate = mock(NonBlockingConnectionPlainDelegate.class); + when(delegate.doWrite(any())).thenReturn(new NonBlockingConnectionDelegate.WriteResult(false, 0L)); + injectDelegate(delegate); + + // close the connection + _nonBlockingConnection.setSelectionTask(selectionTask); + _nonBlockingConnection.close(); + _nonBlockingConnection.doWork(); + + // there should be only 2 log messages + assertEquals(2, appender.getEvents().size()); + + // first log message states that connection will be closed + final ILoggingEvent firstLogEntry = appender.getEvents().get(0); + assertEquals(Level.DEBUG, firstLogEntry.getLevel()); + assertEquals("Closing localhost/127.0.0.1:1000", firstLogEntry.getMessage()); + + // second log message informs about timeout which happened during shutdownFinalWrite() + final ILoggingEvent secondLogEntry = appender.getEvents().get(1); + assertEquals(Level.INFO, secondLogEntry.getLevel()); + assertEquals("Exception performing final write/close for '{}': {}", secondLogEntry.getMessage()); + assertEquals("localhost/127.0.0.1:1000", secondLogEntry.getArgumentArray()[0]); + assertTrue(String.valueOf(secondLogEntry.getArgumentArray()[1]).startsWith("Failed to perform final write to connection after")); + } + + /** Delegate immediately returns WriteResult containing complete = true, no timeout handling involved */ + @Test + void shutdownFinalWriteWithoutLooping() throws Exception + { + // construct delegate mock returning WriteResult[complete=true] + final NonBlockingConnectionPlainDelegate delegate = mock(NonBlockingConnectionPlainDelegate.class); + when(delegate.doWrite(any())).thenReturn(new NonBlockingConnectionDelegate.WriteResult(true, 0L)); + injectDelegate(delegate); + + // close the connection + _nonBlockingConnection.setSelectionTask(selectionTask); + _nonBlockingConnection.close(); + _nonBlockingConnection.doWork(); + + // there should be only 1 log message (no timeout) + assertEquals(1, appender.getEvents().size()); + + // first log message states that connection will be closed + final ILoggingEvent firstLogEntry = appender.getEvents().get(0); + assertEquals(Level.DEBUG, firstLogEntry.getLevel()); + assertEquals("Closing localhost/127.0.0.1:1000", firstLogEntry.getMessage()); + } + + /** Inject delegate using reflection */ + private void injectDelegate(final NonBlockingConnectionPlainDelegate delegate) throws Exception + { + final Field delegateField = NonBlockingConnection.class.getDeclaredField("_delegate"); + delegateField.setAccessible(true); + delegateField.set(_nonBlockingConnection, delegate); + } + + /** Logging Appender string list of logging events */ + static class TestAppender implements Appender<ILoggingEvent> + { + private final String className; + private final List<ILoggingEvent> _events = new ArrayList<>(); + + TestAppender(final Class<?> clazz) + { + className = clazz.getCanonicalName(); + } + + @Override + public String getName() + { + return getClass().getSimpleName(); + } + + @Override + public void doAppend(final ILoggingEvent iLoggingEvent) throws LogbackException + { + if (className.equals(iLoggingEvent.getLoggerName())) + { + _events.add(iLoggingEvent); + } + } + + @Override + public void setName(final String s) + { + + } + + @Override + public void setContext(final Context context) + { + + } + + @Override + public Context getContext() + { + return null; + } + + @Override + public void addStatus(final Status status) + { + + } + + @Override + public void addInfo(final String s) + { + + } + + @Override + public void addInfo(final String s, final Throwable throwable) + { + + } + + @Override + public void addWarn(final String s) + { + + } + + @Override + public void addWarn(final String s, final Throwable throwable) + { + + } + + @Override + public void addError(final String s) + { + + } + + @Override + public void addError(final String s, final Throwable throwable) + { + + } + + @Override + public void addFilter(final Filter<ILoggingEvent> filter) + { + + } + + @Override + public void clearAllFilters() + { + + } + + @Override + public List<Filter<ILoggingEvent>> getCopyOfAttachedFiltersList() + { + return List.of(); + } + + @Override + public FilterReply getFilterChainDecision(final ILoggingEvent iLoggingEvent) + { + return null; + } + + @Override + public void start() + { + + } + + @Override + public void stop() + { + + } + + @Override + public boolean isStarted() + { + return true; + } + + public List<ILoggingEvent> getEvents() + { + return _events; + } + + public void clear() + { + _events.clear(); + } + } +} diff --git a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java index 510177a91c..435607210a 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java @@ -213,6 +213,8 @@ public class TCPandSSLTransportTest extends UnitTestBase when(port.getContextValue(Boolean.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING)).thenReturn(false); when(port.getContextValue(Integer.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD)).thenReturn(1000); when(port.getContextValue(Integer.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD)).thenReturn(1005); + when(port.getContextValue(Integer.class, AmqpPort.FINAL_WRITE_THRESHOLD)).thenReturn(100); + when(port.getContextValue(Long.class, AmqpPort.FINAL_WRITE_TIMEOUT)).thenReturn(100L); final ObjectMapper mapper = new ObjectMapper(); final JavaType type = mapper.getTypeFactory().constructCollectionType(List.class, String.class); final List<String> allowList = mapper.readValue(Broker.DEFAULT_SECURITY_TLS_PROTOCOL_ALLOW_LIST, type); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org