Repository: kafka Updated Branches: refs/heads/trunk 300565381 -> 361686d4a
KAFKA-2618; Disable SSL renegotiation for 0.9.0.0 Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Sriharsha Chintalapani <schintalap...@hortonworks.com>, Rajini Sivaram <rajinisiva...@googlemail.com>, Jun Rao <jun...@gmail.com> Closes #339 from ijuma/kafka-2618-disable-renegotiation Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/361686d4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/361686d4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/361686d4 Branch: refs/heads/trunk Commit: 361686d4a999298b6e5b63cacda72168172eb936 Parents: 3005653 Author: Ismael Juma <ism...@juma.me.uk> Authored: Wed Oct 21 14:39:39 2015 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Wed Oct 21 14:39:39 2015 -0700 ---------------------------------------------------------------------- .../kafka/common/network/SSLTransportLayer.java | 23 +++- .../kafka/common/network/SSLSelectorTest.java | 110 ++++++++++++++----- .../common/network/SSLTransportLayerTest.java | 2 +- .../kafka/common/network/SelectorTest.java | 2 +- 4 files changed, 99 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/361686d4/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java index e7afa02..813f0b1 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java @@ -31,6 +31,7 @@ import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; +import javax.net.ssl.SSLHandshakeException; import javax.net.ssl.SSLSession; import javax.net.ssl.SSLPeerUnverifiedException; @@ -49,6 +50,8 @@ public class SSLTransportLayer implements TransportLayer { private final SSLEngine sslEngine; private final SelectionKey key; private final SocketChannel socketChannel; + private final boolean enableRenegotiation; + private HandshakeStatus handshakeStatus; private SSLEngineResult handshakeResult; private boolean handshakeComplete = false; @@ -59,17 +62,19 @@ public class SSLTransportLayer implements TransportLayer { private ByteBuffer emptyBuf = ByteBuffer.allocate(0); public static SSLTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException { - SSLTransportLayer transportLayer = new SSLTransportLayer(channelId, key, sslEngine); + // Disable renegotiation by default until we have fixed the known issues with the existing implementation + SSLTransportLayer transportLayer = new SSLTransportLayer(channelId, key, sslEngine, false); transportLayer.startHandshake(); return transportLayer; } // Prefer `create`, only use this in tests - SSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException { + SSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, boolean enableRenegotiation) throws IOException { this.channelId = channelId; this.key = key; this.socketChannel = (SocketChannel) key.channel(); this.sslEngine = sslEngine; + this.enableRenegotiation = enableRenegotiation; } /** @@ -305,6 +310,12 @@ public class SSLTransportLayer implements TransportLayer { } } + private void renegotiate() throws IOException { + if (!enableRenegotiation) + throw new SSLHandshakeException("Renegotiation is not supported"); + handshake(); + } + /** * Executes the SSLEngine tasks needed. @@ -435,10 +446,10 @@ public class SSLTransportLayer implements TransportLayer { SSLEngineResult unwrapResult = sslEngine.unwrap(netReadBuffer, appReadBuffer); netReadBuffer.compact(); // handle ssl renegotiation. - if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) { + if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && unwrapResult.getStatus() == Status.OK) { log.trace("SSLChannel Read begin renegotiation channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}", channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position()); - handshake(); + renegotiate(); break; } @@ -541,8 +552,8 @@ public class SSLTransportLayer implements TransportLayer { netWriteBuffer.flip(); //handle ssl renegotiation - if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) { - handshake(); + if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK) { + renegotiate(); return written; } http://git-wip-us.apache.org/repos/asf/kafka/blob/361686d4/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java index 6475ff0..eee7531 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java @@ -13,8 +13,13 @@ package org.apache.kafka.common.network; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.io.File; import java.io.IOException; @@ -22,6 +27,7 @@ import java.net.InetSocketAddress; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.config.SSLConfigs; +import org.apache.kafka.common.security.ssl.SSLFactory; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestSSLUtils; import org.junit.After; @@ -34,6 +40,7 @@ import org.junit.Test; public class SSLSelectorTest extends SelectorTest { private Metrics metrics; + private Map<String, Object> sslClientConfigs; @Before public void setup() throws Exception { @@ -44,7 +51,7 @@ public class SSLSelectorTest extends SelectorTest { this.server = new EchoServer(sslServerConfigs); this.server.start(); this.time = new MockTime(); - Map<String, Object> sslClientConfigs = TestSSLUtils.createSSLConfig(false, false, Mode.SERVER, trustStoreFile, "client"); + sslClientConfigs = TestSSLUtils.createSSLConfig(false, false, Mode.SERVER, trustStoreFile, "client"); sslClientConfigs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT); @@ -66,46 +73,89 @@ public class SSLSelectorTest extends SelectorTest { */ @Test public void testRenegotiation() throws Exception { - int reqs = 500; + ChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT) { + @Override + protected SSLTransportLayer buildTransportLayer(SSLFactory sslFactory, String id, SelectionKey key) throws IOException { + SocketChannel socketChannel = (SocketChannel) key.channel(); + SSLTransportLayer transportLayer = new SSLTransportLayer(id, key, + sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort()), + true); + transportLayer.startHandshake(); + return transportLayer; + } + }; + channelBuilder.configure(sslClientConfigs); + Selector selector = new Selector(5000, metrics, time, "MetricGroup2", new LinkedHashMap<String, String>(), channelBuilder); + try { + int reqs = 500; + String node = "0"; + // create connections + InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); + + // send echo requests and receive responses + int requests = 0; + int responses = 0; + int renegotiates = 0; + while (!selector.isChannelReady(node)) { + selector.poll(1000L); + } + selector.send(createSend(node, node + "-" + 0)); + requests++; + + // loop until we complete all requests + while (responses < reqs) { + selector.poll(0L); + if (responses >= 100 && renegotiates == 0) { + renegotiates++; + server.renegotiate(); + } + assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size()); + + // handle any responses we may have gotten + for (NetworkReceive receive : selector.completedReceives()) { + String[] pieces = asString(receive).split("-"); + assertEquals("Should be in the form 'conn-counter'", 2, pieces.length); + assertEquals("Check the source", receive.source(), pieces[0]); + assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position()); + assertEquals("Check the request counter", responses, Integer.parseInt(pieces[1])); + responses++; + } + + // prepare new sends for the next round + for (int i = 0; i < selector.completedSends().size() && requests < reqs && selector.isChannelReady(node); i++, requests++) { + selector.send(createSend(node, node + "-" + requests)); + } + } + } finally { + selector.close(); + } + } + + @Test + public void testDisabledRenegotiation() throws Exception { String node = "0"; // create connections InetSocketAddress addr = new InetSocketAddress("localhost", server.port); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); // send echo requests and receive responses - int requests = 0; - int responses = 0; - int renegotiates = 0; while (!selector.isChannelReady(node)) { selector.poll(1000L); } selector.send(createSend(node, node + "-" + 0)); - requests++; - - // loop until we complete all requests - while (responses < reqs) { - selector.poll(0L); - if (responses >= 100 && renegotiates == 0) { - renegotiates++; - server.renegotiate(); - } - assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size()); - - // handle any responses we may have gotten - for (NetworkReceive receive : selector.completedReceives()) { - String[] pieces = asString(receive).split("-"); - assertEquals("Should be in the form 'conn-counter'", 2, pieces.length); - assertEquals("Check the source", receive.source(), pieces[0]); - assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position()); - assertEquals("Check the request counter", responses, Integer.parseInt(pieces[1])); - responses++; - } - - // prepare new sends for the next round - for (int i = 0; i < selector.completedSends().size() && requests < reqs && selector.isChannelReady(node); i++, requests++) { - selector.send(createSend(node, node + "-" + requests)); - } + selector.poll(0L); + server.renegotiate(); + selector.send(createSend(node, node + "-" + 1)); + long expiryTime = System.currentTimeMillis() + 2000; + + List<String> disconnected = new ArrayList<>(); + while (!disconnected.contains(node) && System.currentTimeMillis() < expiryTime) { + selector.poll(10); + disconnected.addAll(selector.disconnected()); } + assertTrue("Renegotiation should cause disconnection", disconnected.contains(node)); + } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/361686d4/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java index 987f4bb..ebb59b5 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java @@ -493,7 +493,7 @@ public class SSLTransportLayerTest { public TestSSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, Integer netReadBufSize, Integer netWriteBufSize, Integer appBufSize) throws IOException { - super(channelId, key, sslEngine); + super(channelId, key, sslEngine, false); this.netReadBufSize = new ResizeableBufferSize(netReadBufSize); this.netWriteBufSize = new ResizeableBufferSize(netWriteBufSize); this.appBufSize = new ResizeableBufferSize(appBufSize); http://git-wip-us.apache.org/repos/asf/kafka/blob/361686d4/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 6aa60ce..683eeee 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -42,7 +42,7 @@ public class SelectorTest { protected EchoServer server; protected Time time; - protected Selectable selector; + protected Selector selector; protected ChannelBuilder channelBuilder; private Metrics metrics;