This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new 0e46be5bf07 KAFKA-16471 invoke SSLEngine::closeInbound on 
SslTransportLayer close (#15655)
0e46be5bf07 is described below

commit 0e46be5bf078f6ca734b7502a1f2b10cf8bc4eee
Author: Gaurav Narula <gaurav_naru...@apple.com>
AuthorDate: Fri Apr 5 05:11:03 2024 +0100

    KAFKA-16471 invoke SSLEngine::closeInbound on SslTransportLayer close 
(#15655)
    
    Invokes `SSLEngine::closeInbound` after we flush close_notify alert tothe 
socket. This fixes memory leak in Netty/OpenSSL based SSLEngine which only free 
native resources once closeInbound has been invoked.
    
    Reviewers: Omnia Ibrahim <o.g.h.ibra...@gmail.com>, Luke Chen 
<show...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com>
---
 .../kafka/common/network/SslTransportLayer.java    |  8 +++++
 .../common/network/SslTransportLayerTest.java      | 35 ++++++++++++++++++++++
 2 files changed, 43 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 904c5216a40..870b1c7f7dc 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -199,6 +199,14 @@ public class SslTransportLayer implements TransportLayer {
         } catch (IOException ie) {
             log.debug("Failed to send SSL Close message", ie);
         } finally {
+            try {
+                sslEngine.closeInbound();
+            } catch (SSLException e) {
+                // This log is for debugging purposes as an exception might 
occur frequently
+                // at this point due to peers not following the TLS specs and 
failing to send a close_notify alert. 
+                // Even if they do, currently, we do not read data from the 
socket after invoking close().
+                log.debug("SSLEngine.closeInBound() raised an exception.", e);
+            }
             socketChannel.socket().close();
             socketChannel.close();
             netReadBuffer = null;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index d92f4facb3c..e65d53b2b7a 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestSslUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -47,6 +48,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.SelectionKey;
@@ -65,6 +67,7 @@ import java.util.stream.Stream;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLParameters;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -72,6 +75,12 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assumptions.assumeTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for the SSL transport layer. These use a test harness that runs a 
simple socket server that echos back responses.
@@ -1467,4 +1476,30 @@ public class SslTransportLayerTest {
             }
         }
     }
+
+    @Test
+    public void testSSLEngineCloseInboundInvokedOnClose() throws IOException {
+        // Given
+        SSLEngine sslEngine = mock(SSLEngine.class);
+        Socket socket = mock(Socket.class);
+        SocketChannel socketChannel = mock(SocketChannel.class);
+        SelectionKey selectionKey = mock(SelectionKey.class);
+        when(socketChannel.socket()).thenReturn(socket);
+        when(selectionKey.channel()).thenReturn(socketChannel);
+        doThrow(new SSLException("Mock 
exception")).when(sslEngine).closeInbound();
+        SslTransportLayer sslTransportLayer = new SslTransportLayer(
+                "test-channel",
+                selectionKey,
+                sslEngine,
+                mock(ChannelMetadataRegistry.class)
+        );
+
+        // When
+        sslTransportLayer.close();
+
+        // Then
+        verify(sslEngine, times(1)).closeOutbound();
+        verify(sslEngine, times(1)).closeInbound();
+        verifyNoMoreInteractions(sslEngine);
+    }
 }

Reply via email to