This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 1e52b292861cf163e67741f92da2b4497b5fe3aa
Author: dakirily <daniel.kiril...@gmail.com>
AuthorDate: Fri Jun 25 12:08:22 2021 +0200

    QPID-8545: [Broker-J] SSL Engine looping circuit breaker
    
    This closes #98
---
 .../apache/qpid/server/model/port/AmqpPort.java    | 16 ++++++++++
 .../server/transport/NonBlockingConnection.java    |  1 +
 .../NonBlockingConnectionTLSDelegate.java          | 36 ++++++++++++++++++++--
 .../server/transport/TCPandSSLTransportTest.java   |  3 ++
 4 files changed, 54 insertions(+), 2 deletions(-)

diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
index 6bfcc38..7fcb110 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
@@ -56,6 +56,10 @@ public interface AmqpPort<X extends AmqpPort<X>> extends 
Port<X>
     String PORT_AMQP_NUMBER_OF_SELECTORS = 
"qpid.port.amqp.threadPool.numberOfSelectors";
     String PORT_AMQP_ACCEPT_BACKLOG = "qpid.port.amqp.acceptBacklog";
 
+    String PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING = 
"qpid.port.amqp.diagnosisOfSslEngineLooping";
+    String PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD = 
"qpid.port.amqp.diagnosisOfSslEngineLoopingWarnThreshold";
+    String PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD = 
"qpid.port.amqp.diagnosisOfSslEngineLoopingBreakThreshold";
+
     @ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS)
     String INSTALLED_PROTOCOLS = AmqpPortImpl.getInstalledProtocolsAsString();
 
@@ -64,6 +68,18 @@ public interface AmqpPort<X extends AmqpPort<X>> extends 
Port<X>
     @ManagedContextDefault(name = PORT_MAX_OPEN_CONNECTIONS)
     int DEFAULT_MAX_OPEN_CONNECTIONS = -1;
 
+    @SuppressWarnings("unused")
+    @ManagedContextDefault( name = PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING)
+    boolean DEFAULT_PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING = false;
+
+    @SuppressWarnings("unused")
+    @ManagedContextDefault( name = 
PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD)
+    long DEFAULT_PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD = 1000;
+
+    @SuppressWarnings("unused")
+    @ManagedContextDefault( name = 
PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD)
+    long DEFAULT_PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD = 1005;
+
     String PORT_IGNORE_INVALID_SNI = "qpid.port.amqp.ignoreInvalidSni";
 
     @SuppressWarnings("unused")
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
 
b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
index 843e3d7..1bc1943 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
@@ -691,4 +691,5 @@ public class NonBlockingConnection implements 
ServerNetworkConnection, ByteBuffe
     {
         return _selectedHost;
     }
+
 }
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
 
b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
index 869775e..9255216 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.ssl.SNIHostName;
 import javax.net.ssl.SSLEngine;
@@ -63,7 +64,10 @@ public class NonBlockingConnectionTLSDelegate implements 
NonBlockingConnectionDe
     private QpidByteBuffer _netOutputBuffer;
     private QpidByteBuffer _applicationBuffer;
     private final boolean _ignoreInvalidSni;
-
+    private final AtomicInteger _loopingCounter = new AtomicInteger(0);
+    private final boolean _enableDiagnosisOfSslEngineLooping;
+    private final long _diagnosisOfSslEngineLoopingWarnThreshold;
+    private final long _diagnosisOfSslEngineLoopingBreakThreshold;
 
     public NonBlockingConnectionTLSDelegate(NonBlockingConnection parent, 
AmqpPort port)
     {
@@ -82,6 +86,9 @@ public class NonBlockingConnectionTLSDelegate implements 
NonBlockingConnectionDe
         _applicationBuffer = QpidByteBuffer.allocateDirect(_networkBufferSize);
         _netOutputBuffer = QpidByteBuffer.allocateDirect(_networkBufferSize);
         _ignoreInvalidSni = port.isIgnoreInvalidSni();
+        _enableDiagnosisOfSslEngineLooping = 
port.getContextValue(Boolean.class, 
AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING);
+        _diagnosisOfSslEngineLoopingWarnThreshold = 
port.getContextValue(Integer.class, 
AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD);
+        _diagnosisOfSslEngineLoopingBreakThreshold = 
port.getContextValue(Integer.class, 
AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD);
     }
 
     @Override
@@ -300,7 +307,27 @@ public class NonBlockingConnectionTLSDelegate implements 
NonBlockingConnectionDe
                     _encryptedOutput.add(_netOutputBuffer);
                     _netOutputBuffer = 
QpidByteBuffer.allocateDirect(_networkBufferSize);
                 }
-
+                // SSLEngine looping circuit breaker
+                if (_enableDiagnosisOfSslEngineLooping)
+                {
+                    _loopingCounter.incrementAndGet();
+                    if (_loopingCounter.get() > 
_diagnosisOfSslEngineLoopingWarnThreshold)
+                    {
+                        LOGGER.warn("SSLEngine looping detected, _status: {}, 
_sslEngine.isOutboundDone(): {}, _sslEngine.isInboundDone(): {}, "
+                                        + "_sslEngine.getPeerHost(): {}, 
_sslEngine.getPeerPort(): {}",
+                                "[ Status = " + _status.getStatus() + ", 
HandshakeStatus = " + _status.getHandshakeStatus()
+                                        + ", bytesConsumed = " + 
_status.bytesConsumed() + ", bytesProduced = " + _status.bytesProduced() + " ]",
+                                _sslEngine.isOutboundDone(),
+                                _sslEngine.isInboundDone(),
+                                _sslEngine.getPeerHost(),
+                                _sslEngine.getPeerPort()
+                        );
+                    }
+                    if (_loopingCounter.get() > 
_diagnosisOfSslEngineLoopingBreakThreshold)
+                    {
+                        throw new SSLException("SSLEngine looping detected, 
executing circuit breaker");
+                    }
+                }
             }
             else
             {
@@ -310,6 +337,11 @@ public class NonBlockingConnectionTLSDelegate implements 
NonBlockingConnectionDe
         }
         while(encrypted && _sslEngine.getHandshakeStatus() != 
SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
 
+        if (_enableDiagnosisOfSslEngineLooping && encrypted)
+        {
+            _loopingCounter.set(0);
+        }
+
         if(_netOutputBuffer.position() != 0)
         {
             final QpidByteBuffer outputBuffer = _netOutputBuffer;
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
index 51902ef..4cc90e5 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
@@ -272,6 +272,9 @@ public class TCPandSSLTransportTest extends UnitTestBase
         when(port.getContextValue(Integer.class, 
AmqpPort.PORT_AMQP_ACCEPT_BACKLOG))
                 .thenReturn(AmqpPort.DEFAULT_PORT_AMQP_ACCEPT_BACKLOG);
         
when(port.getProtocolHandshakeTimeout()).thenReturn(AmqpPort.DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT);
+        when(port.getContextValue(Boolean.class, 
AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING)).thenReturn(false);
+        when(port.getContextValue(Integer.class, 
AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD)).thenReturn(1000);
+        when(port.getContextValue(Integer.class, 
AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD)).thenReturn(1005);
         ObjectMapper mapper = new ObjectMapper();
         JavaType type = 
mapper.getTypeFactory().constructCollectionType(List.class, String.class);
         List<String> allowList = 
mapper.readValue(Broker.DEFAULT_SECURITY_TLS_PROTOCOL_ALLOW_LIST, type);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to