This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new edfd769  KAFKA-13418: Support key updates with TLS 1.3 (#11966)
edfd769 is described below

commit edfd769f426e5baaf94c379e23624ec82e3e80bb
Author: Ismael Juma <[email protected]>
AuthorDate: Tue Mar 29 14:59:38 2022 -0700

    KAFKA-13418: Support key updates with TLS 1.3 (#11966)
    
    Key updates with TLS 1.3 trigger code paths similar to renegotiation with 
TLS 1.2.
    Update the read/write paths not to throw an exception in this case (kept 
the exception
    in the `handshake` method).
    
    With the default configuration, key updates happen after 2^37 bytes are 
encrypted.
    There is a security property to adjust this configuration, but the change 
has to be
    done before it is used for the first time and it cannot be changed after 
that. As such,
    it is best done via a system test (filed KAFKA-13779).
    
    To validate the change, I wrote a unit test that forces key updates and 
manually ran
    a producer workload that produced more than 2^37 bytes. Both cases failed 
without
    these changes and pass with them.
    
    Note that Shylaja Kokoori attached a patch with the SslTransportLayer fix 
and hence
    included them as a co-author of this change.
    
    Reviewers: Rajini Sivaram <[email protected]>
    
    Co-authored-by: Shylaja Kokoori
---
 .../kafka/common/network/SslTransportLayer.java    | 16 ++--
 .../apache/kafka/common/network/SelectorTest.java  |  5 --
 .../kafka/common/network/SslSelectorTest.java      | 44 ++---------
 .../kafka/common/network/Tls12SelectorTest.java    | 72 +++++++++++++++++
 .../kafka/common/network/Tls13SelectorTest.java    | 92 ++++++++++++++++++++++
 5 files changed, 180 insertions(+), 49 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index b9879ad..d276e99 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -71,6 +71,8 @@ public class SslTransportLayer implements TransportLayer {
         CLOSING
     }
 
+    private static final String TLS13 = "TLSv1.3";
+
     private final String channelId;
     private final SSLEngine sslEngine;
     private final SelectionKey key;
@@ -446,7 +448,7 @@ public class SslTransportLayer implements TransportLayer {
             if (netWriteBuffer.hasRemaining())
                 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
             else {
-                state = sslEngine.getSession().getProtocol().equals("TLSv1.3") 
? State.POST_HANDSHAKE : State.READY;
+                state = sslEngine.getSession().getProtocol().equals(TLS13) ? 
State.POST_HANDSHAKE : State.READY;
                 key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                 SSLSession session = sslEngine.getSession();
                 log.debug("SSL handshake completed successfully with peerHost 
'{}' peerPort {} peerPrincipal '{}' cipherSuite '{}'",
@@ -578,10 +580,11 @@ public class SslTransportLayer implements TransportLayer {
                         throw e;
                 }
                 netReadBuffer.compact();
-                // handle ssl renegotiation.
+                // reject renegotiation if TLS < 1.3, key updates for TLS 1.3 
are allowed
                 if (unwrapResult.getHandshakeStatus() != 
HandshakeStatus.NOT_HANDSHAKING &&
                         unwrapResult.getHandshakeStatus() != 
HandshakeStatus.FINISHED &&
-                        unwrapResult.getStatus() == Status.OK) {
+                        unwrapResult.getStatus() == Status.OK &&
+                        !sslEngine.getSession().getProtocol().equals(TLS13)) {
                     log.error("Renegotiation requested, but it is not 
supported, channelId {}, " +
                         "appReadBuffer pos {}, netReadBuffer pos {}, 
netWriteBuffer pos {} handshakeStatus {}", channelId,
                         appReadBuffer.position(), netReadBuffer.position(), 
netWriteBuffer.position(), unwrapResult.getHandshakeStatus());
@@ -699,9 +702,12 @@ public class SslTransportLayer implements TransportLayer {
             SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
             netWriteBuffer.flip();
 
-            //handle ssl renegotiation
-            if (wrapResult.getHandshakeStatus() != 
HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK)
+            // reject renegotiation if TLS < 1.3, key updates for TLS 1.3 are 
allowed
+            if (wrapResult.getHandshakeStatus() != 
HandshakeStatus.NOT_HANDSHAKING &&
+                    wrapResult.getStatus() == Status.OK &&
+                    !sslEngine.getSession().getProtocol().equals(TLS13)) {
                 throw renegotiationException();
+            }
 
             if (wrapResult.getStatus() == Status.OK) {
                 written += wrapResult.bytesConsumed();
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index f276cd4..43b0956 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -110,10 +110,6 @@ public class SelectorTest {
         }
     }
 
-    public SecurityProtocol securityProtocol() {
-        return SecurityProtocol.PLAINTEXT;
-    }
-
     protected Map<String, Object> clientConfigs() {
         return new HashMap<>();
     }
@@ -1015,7 +1011,6 @@ public class SelectorTest {
 
     private String blockingRequest(String node, String s) throws IOException {
         selector.send(createSend(node, s));
-        selector.poll(1000L);
         while (true) {
             selector.poll(1000L);
             for (NetworkReceive receive : selector.completedReceives())
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 7f95566..0ddfce6 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.network;
 
 import java.nio.channels.SelectionKey;
+import java.security.GeneralSecurityException;
 import javax.net.ssl.SSLEngine;
 
 import org.apache.kafka.common.config.SecurityConfig;
@@ -43,11 +44,9 @@ import java.net.InetSocketAddress;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.security.Security;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -61,7 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /**
  * A set of tests for the selector. These use a test harness that runs a 
simple socket server that echos back responses.
  */
-public class SslSelectorTest extends SelectorTest {
+public abstract class SslSelectorTest extends SelectorTest {
 
     private Map<String, Object> sslClientConfigs;
 
@@ -73,7 +72,7 @@ public class SslSelectorTest extends SelectorTest {
         this.server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs);
         this.server.start();
         this.time = new MockTime();
-        sslClientConfigs = TestSslUtils.createSslConfig(false, false, 
Mode.CLIENT, trustStoreFile, "client");
+        sslClientConfigs = createSslClientConfigs(trustStoreFile);
         LogContext logContext = new LogContext();
         this.channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false, 
logContext);
         this.channelBuilder.configure(sslClientConfigs);
@@ -81,6 +80,8 @@ public class SslSelectorTest extends SelectorTest {
         this.selector = new Selector(5000, metrics, time, "MetricGroup", 
channelBuilder, logContext);
     }
 
+    protected abstract Map<String, Object> createSslClientConfigs(File 
trustStoreFile) throws GeneralSecurityException, IOException;
+
     @AfterEach
     public void tearDown() throws Exception {
         this.selector.close();
@@ -89,18 +90,12 @@ public class SslSelectorTest extends SelectorTest {
     }
 
     @Override
-    public SecurityProtocol securityProtocol() {
-        return SecurityProtocol.PLAINTEXT;
-    }
-
-    @Override
     protected Map<String, Object> clientConfigs() {
         return sslClientConfigs;
     }
 
     @Test
     public void testConnectionWithCustomKeyManager() throws Exception {
-
         TestProviderCreator testProviderCreator = new TestProviderCreator();
 
         int requestSize = 100 * 1024;
@@ -249,35 +244,6 @@ public class SslSelectorTest extends SelectorTest {
         verifySelectorEmpty();
     }
 
-    /**
-     * Renegotiation is not supported since it is potentially unsafe and it 
has been removed in TLS 1.3
-     */
-    @Test
-    public void testRenegotiationFails() throws Exception {
-        String node = "0";
-        // create connections
-        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
-        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        // send echo requests and receive responses
-        while (!selector.isChannelReady(node)) {
-            selector.poll(1000L);
-        }
-        selector.send(createSend(node, node + "-" + 0));
-        selector.poll(0L);
-        server.renegotiate();
-        selector.send(createSend(node, node + "-" + 1));
-        long expiryTime = System.currentTimeMillis() + 2000;
-
-        List<String> disconnected = new ArrayList<>();
-        while (!disconnected.contains(node) && System.currentTimeMillis() < 
expiryTime) {
-            selector.poll(10);
-            disconnected.addAll(selector.disconnected().keySet());
-        }
-        assertTrue(disconnected.contains(node), "Renegotiation should cause 
disconnection");
-
-    }
-
     @Override
     @Test
     public void testMuteOnOOM() throws Exception {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/Tls12SelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/Tls12SelectorTest.java
new file mode 100644
index 0000000..59903b5
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/Tls12SelectorTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+import static java.util.Arrays.asList;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.test.TestSslUtils;
+import org.junit.jupiter.api.Test;
+
+public class Tls12SelectorTest extends SslSelectorTest {
+
+    @Override
+    protected Map<String, Object> createSslClientConfigs(File trustStoreFile)
+        throws GeneralSecurityException, IOException {
+        Map<String, Object> configs = TestSslUtils.createSslConfig(false, 
false, Mode.CLIENT,
+            trustStoreFile, "client");
+        configs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
asList("TLSv1.2"));
+        return configs;
+    }
+
+    /**
+     * Renegotiation is not supported when TLS 1.2 is used (renegotiation was 
removed from TLS 1.3)
+     */
+    @Test
+    public void testRenegotiationFails() throws Exception {
+        String node = "0";
+        // create connections
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+
+        // send echo requests and receive responses
+        while (!selector.isChannelReady(node)) {
+            selector.poll(1000L);
+        }
+        selector.send(createSend(node, node + "-" + 0));
+        selector.poll(0L);
+        server.renegotiate();
+        selector.send(createSend(node, node + "-" + 1));
+        long expiryTime = System.currentTimeMillis() + 2000;
+
+        List<String> disconnected = new ArrayList<>();
+        while (!disconnected.contains(node) && System.currentTimeMillis() < 
expiryTime) {
+            selector.poll(10);
+            disconnected.addAll(selector.disconnected().keySet());
+        }
+        assertTrue(disconnected.contains(node), "Renegotiation should cause 
disconnection");
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/Tls13SelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/Tls13SelectorTest.java
new file mode 100644
index 0000000..afae3e2
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/Tls13SelectorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+import static java.util.Arrays.asList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledForJreRange;
+import org.junit.jupiter.api.condition.JRE;
+
+@EnabledForJreRange(min = JRE.JAVA_11) // TLS 1.3 is only supported with Java 
11 and newer
+public class Tls13SelectorTest extends SslSelectorTest {
+
+    @Override
+    protected Map<String, Object> createSslClientConfigs(File trustStoreFile) 
throws GeneralSecurityException, IOException {
+        Map<String, Object> configs = TestSslUtils.createSslConfig(false, 
false, Mode.CLIENT,
+            trustStoreFile, "client");
+        configs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
asList("TLSv1.3"));
+        return configs;
+    }
+
+    /**
+     * TLS 1.3 has a post-handshake key and IV update, which will update the 
sending and receiving keys
+     * for one side of the connection.
+     *
+     * Key Usage Limits will trigger an update when the algorithm limits are 
reached, but the default
+     * value is too large (2^37 bytes of plaintext data) for a unit test. This 
value can be overridden
+     * via the security property `jdk.tls.keyLimits`, but that's also 
difficult to achieve in a unit
+     * test.
+     *
+     * Applications can also trigger an update by calling 
`SSLSocket.startHandshake()` or
+     * `SSLEngine.beginHandshake()` (this would trigger `renegotiation` with 
TLS 1.2) and that's the
+     * approach we take here.
+     */
+    @Test
+    public void testKeyUpdate() throws Exception {
+        String node = "0";
+        // create connections
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port);
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+        // send echo requests and receive responses
+        while (!selector.isChannelReady(node)) {
+            selector.poll(1000L);
+        }
+        selector.send(createSend(node, node + "-" + 0));
+        selector.poll(0L);
+        server.renegotiate();
+        selector.send(createSend(node,  node + "-" + 1));
+        List<NetworkReceive> received = new ArrayList<>();
+        TestUtils.waitForCondition(() -> {
+            try {
+                selector.poll(1000L);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            for (NetworkReceive receive : selector.completedReceives()) {
+                if (receive.source().equals(node))
+                    received.add(receive);
+            }
+            return received.size() == 2;
+        }, "Expected two receives, got " + received.size());
+
+        assertEquals(asList("0-0", "0-1"), 
received.stream().map(this::asString).collect(Collectors.toList()));
+    }
+}

Reply via email to