This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 1e52b292861cf163e67741f92da2b4497b5fe3aa Author: dakirily <daniel.kiril...@gmail.com> AuthorDate: Fri Jun 25 12:08:22 2021 +0200 QPID-8545: [Broker-J] SSL Engine looping circuit breaker This closes #98 --- .../apache/qpid/server/model/port/AmqpPort.java | 16 ++++++++++ .../server/transport/NonBlockingConnection.java | 1 + .../NonBlockingConnectionTLSDelegate.java | 36 ++++++++++++++++++++-- .../server/transport/TCPandSSLTransportTest.java | 3 ++ 4 files changed, 54 insertions(+), 2 deletions(-) diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java index 6bfcc38..7fcb110 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java @@ -56,6 +56,10 @@ public interface AmqpPort<X extends AmqpPort<X>> extends Port<X> String PORT_AMQP_NUMBER_OF_SELECTORS = "qpid.port.amqp.threadPool.numberOfSelectors"; String PORT_AMQP_ACCEPT_BACKLOG = "qpid.port.amqp.acceptBacklog"; + String PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING = "qpid.port.amqp.diagnosisOfSslEngineLooping"; + String PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD = "qpid.port.amqp.diagnosisOfSslEngineLoopingWarnThreshold"; + String PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD = "qpid.port.amqp.diagnosisOfSslEngineLoopingBreakThreshold"; + @ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS) String INSTALLED_PROTOCOLS = AmqpPortImpl.getInstalledProtocolsAsString(); @@ -64,6 +68,18 @@ public interface AmqpPort<X extends AmqpPort<X>> extends Port<X> @ManagedContextDefault(name = PORT_MAX_OPEN_CONNECTIONS) int DEFAULT_MAX_OPEN_CONNECTIONS = -1; + @SuppressWarnings("unused") + @ManagedContextDefault( name = PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING) + boolean DEFAULT_PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING = false; + + @SuppressWarnings("unused") + @ManagedContextDefault( name = PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD) + long DEFAULT_PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD = 1000; + + @SuppressWarnings("unused") + @ManagedContextDefault( name = PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD) + long DEFAULT_PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD = 1005; + String PORT_IGNORE_INVALID_SNI = "qpid.port.amqp.ignoreInvalidSni"; @SuppressWarnings("unused") diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java index 843e3d7..1bc1943 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java +++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java @@ -691,4 +691,5 @@ public class NonBlockingConnection implements ServerNetworkConnection, ByteBuffe { return _selectedHost; } + } diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java index 869775e..9255216 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java +++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.ListIterator; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SNIHostName; import javax.net.ssl.SSLEngine; @@ -63,7 +64,10 @@ public class NonBlockingConnectionTLSDelegate implements NonBlockingConnectionDe private QpidByteBuffer _netOutputBuffer; private QpidByteBuffer _applicationBuffer; private final boolean _ignoreInvalidSni; - + private final AtomicInteger _loopingCounter = new AtomicInteger(0); + private final boolean _enableDiagnosisOfSslEngineLooping; + private final long _diagnosisOfSslEngineLoopingWarnThreshold; + private final long _diagnosisOfSslEngineLoopingBreakThreshold; public NonBlockingConnectionTLSDelegate(NonBlockingConnection parent, AmqpPort port) { @@ -82,6 +86,9 @@ public class NonBlockingConnectionTLSDelegate implements NonBlockingConnectionDe _applicationBuffer = QpidByteBuffer.allocateDirect(_networkBufferSize); _netOutputBuffer = QpidByteBuffer.allocateDirect(_networkBufferSize); _ignoreInvalidSni = port.isIgnoreInvalidSni(); + _enableDiagnosisOfSslEngineLooping = port.getContextValue(Boolean.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING); + _diagnosisOfSslEngineLoopingWarnThreshold = port.getContextValue(Integer.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD); + _diagnosisOfSslEngineLoopingBreakThreshold = port.getContextValue(Integer.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD); } @Override @@ -300,7 +307,27 @@ public class NonBlockingConnectionTLSDelegate implements NonBlockingConnectionDe _encryptedOutput.add(_netOutputBuffer); _netOutputBuffer = QpidByteBuffer.allocateDirect(_networkBufferSize); } - + // SSLEngine looping circuit breaker + if (_enableDiagnosisOfSslEngineLooping) + { + _loopingCounter.incrementAndGet(); + if (_loopingCounter.get() > _diagnosisOfSslEngineLoopingWarnThreshold) + { + LOGGER.warn("SSLEngine looping detected, _status: {}, _sslEngine.isOutboundDone(): {}, _sslEngine.isInboundDone(): {}, " + + "_sslEngine.getPeerHost(): {}, _sslEngine.getPeerPort(): {}", + "[ Status = " + _status.getStatus() + ", HandshakeStatus = " + _status.getHandshakeStatus() + + ", bytesConsumed = " + _status.bytesConsumed() + ", bytesProduced = " + _status.bytesProduced() + " ]", + _sslEngine.isOutboundDone(), + _sslEngine.isInboundDone(), + _sslEngine.getPeerHost(), + _sslEngine.getPeerPort() + ); + } + if (_loopingCounter.get() > _diagnosisOfSslEngineLoopingBreakThreshold) + { + throw new SSLException("SSLEngine looping detected, executing circuit breaker"); + } + } } else { @@ -310,6 +337,11 @@ public class NonBlockingConnectionTLSDelegate implements NonBlockingConnectionDe } while(encrypted && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP); + if (_enableDiagnosisOfSslEngineLooping && encrypted) + { + _loopingCounter.set(0); + } + if(_netOutputBuffer.position() != 0) { final QpidByteBuffer outputBuffer = _netOutputBuffer; diff --git a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java index 51902ef..4cc90e5 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java @@ -272,6 +272,9 @@ public class TCPandSSLTransportTest extends UnitTestBase when(port.getContextValue(Integer.class, AmqpPort.PORT_AMQP_ACCEPT_BACKLOG)) .thenReturn(AmqpPort.DEFAULT_PORT_AMQP_ACCEPT_BACKLOG); when(port.getProtocolHandshakeTimeout()).thenReturn(AmqpPort.DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT); + when(port.getContextValue(Boolean.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING)).thenReturn(false); + when(port.getContextValue(Integer.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD)).thenReturn(1000); + when(port.getContextValue(Integer.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD)).thenReturn(1005); ObjectMapper mapper = new ObjectMapper(); JavaType type = mapper.getTypeFactory().constructCollectionType(List.class, String.class); List<String> allowList = mapper.readValue(Broker.DEFAULT_SECURITY_TLS_PROTOCOL_ALLOW_LIST, type); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org