http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java new file mode 100644 index 0000000..a3567af --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.network; + +/* + * Transport layer for PLAINTEXT communication + */ + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.nio.channels.SelectionKey; + +import java.security.Principal; + +import org.apache.kafka.common.security.auth.KafkaPrincipal; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PlaintextTransportLayer implements TransportLayer { + private static final Logger log = LoggerFactory.getLogger(PlaintextTransportLayer.class); + private final SelectionKey key; + private final SocketChannel socketChannel; + private final Principal principal = new KafkaPrincipal("ANONYMOUS"); + + public PlaintextTransportLayer(SelectionKey key) throws IOException { + this.key = key; + this.socketChannel = (SocketChannel) key.channel(); + } + + @Override + public boolean ready() { + return true; + } + + @Override + public void finishConnect() throws IOException { + socketChannel.finishConnect(); + int ops = key.interestOps(); + ops &= ~SelectionKey.OP_CONNECT; + ops |= SelectionKey.OP_READ; + key.interestOps(ops); + } + + @Override + public void disconnect() { + key.cancel(); + } + + @Override + public SocketChannel socketChannel() { + return socketChannel; + } + + @Override + public boolean isOpen() { + return socketChannel.isOpen(); + } + + @Override + public boolean isConnected() { + return socketChannel.isConnected(); + } + + /** + * Closes this channel + * + * @throws IOException If I/O error occurs + */ + @Override + public void close() throws IOException { + socketChannel.socket().close(); + socketChannel.close(); + key.attach(null); + key.cancel(); + } + + /** + * Performs SSL handshake hence is a no-op for the non-secure + * implementation + * @throws IOException + */ + @Override + public void handshake() throws IOException {} + + /** + * Reads a sequence of bytes from this channel into the given buffer. + * + * @param dst The buffer into which bytes are to be transferred + * @return The number of bytes read, possible zero or -1 if the channel has reached end-of-stream + * @throws IOException if some other I/O error occurs + */ + @Override + public int read(ByteBuffer dst) throws IOException { + return socketChannel.read(dst); + } + + /** + * Reads a sequence of bytes from this channel into the given buffers. + * + * @param dsts - The buffers into which bytes are to be transferred. + * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream. + * @throws IOException if some other I/O error occurs + */ + @Override + public long read(ByteBuffer[] dsts) throws IOException { + return socketChannel.read(dsts); + } + + /** + * Reads a sequence of bytes from this channel into a subsequence of the given buffers. + * @param dsts - The buffers into which bytes are to be transferred + * @param offset - The offset within the buffer array of the first buffer into which bytes are to be transferred; must be non-negative and no larger than dsts.length. + * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than dsts.length - offset + * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream. + * @throws IOException if some other I/O error occurs + */ + @Override + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + return socketChannel.read(dsts, offset, length); + } + + /** + * Writes a sequence of bytes to this channel from the given buffer. + * + * @param src The buffer from which bytes are to be retrieved + * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream + * @throws IOException If some other I/O error occurs + */ + @Override + public int write(ByteBuffer src) throws IOException { + return socketChannel.write(src); + } + + /** + * Writes a sequence of bytes to this channel from the given buffer. + * + * @param srcs The buffer from which bytes are to be retrieved + * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream + * @throws IOException If some other I/O error occurs + */ + @Override + public long write(ByteBuffer[] srcs) throws IOException { + return socketChannel.write(srcs); + } + + /** + * Writes a sequence of bytes to this channel from the subsequence of the given buffers. + * + * @param srcs The buffers from which bytes are to be retrieved + * @param offset The offset within the buffer array of the first buffer from which bytes are to be retrieved; must be non-negative and no larger than srcs.length. + * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than srcs.length - offset. + * @return returns no.of bytes written , possibly zero. + * @throws IOException If some other I/O error occurs + */ + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + return socketChannel.write(srcs, offset, length); + } + + /** + * always returns false as there will be not be any + * pending writes since we directly write to socketChannel. + */ + @Override + public boolean hasPendingWrites() { + return false; + } + + /** + * Returns ANONYMOUS as Principal. + */ + @Override + public Principal peerPrincipal() throws IOException { + return principal; + } + + /** + * Adds the interestOps to selectionKey. + * @param interestOps + */ + @Override + public void addInterestOps(int ops) { + key.interestOps(key.interestOps() | ops); + + } + + /** + * Removes the interestOps from selectionKey. + * @param interestOps + */ + @Override + public void removeInterestOps(int ops) { + key.interestOps(key.interestOps() & ~ops); + } + + @Override + public boolean isMute() { + return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) == 0; + } +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java new file mode 100644 index 0000000..88c218b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.network; + +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.Map; + +import org.apache.kafka.common.security.auth.PrincipalBuilder; +import org.apache.kafka.common.security.ssl.SSLFactory; +import org.apache.kafka.common.config.SSLConfigs; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.KafkaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SSLChannelBuilder implements ChannelBuilder { + private static final Logger log = LoggerFactory.getLogger(SSLChannelBuilder.class); + private SSLFactory sslFactory; + private PrincipalBuilder principalBuilder; + private SSLFactory.Mode mode; + + public SSLChannelBuilder(SSLFactory.Mode mode) { + this.mode = mode; + } + + public void configure(Map<String, ?> configs) throws KafkaException { + try { + this.sslFactory = new SSLFactory(mode); + this.sslFactory.configure(configs); + this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG)); + this.principalBuilder.configure(configs); + } catch (Exception e) { + throw new KafkaException(e); + } + } + + public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException { + KafkaChannel channel = null; + try { + SocketChannel socketChannel = (SocketChannel) key.channel(); + SSLTransportLayer transportLayer = new SSLTransportLayer(id, key, + sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(), + socketChannel.socket().getPort())); + Authenticator authenticator = new DefaultAuthenticator(); + authenticator.configure(transportLayer, this.principalBuilder); + channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize); + } catch (Exception e) { + log.info("Failed to create channel due to ", e); + throw new KafkaException(e); + } + return channel; + } + + public void close() { + this.principalBuilder.close(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java new file mode 100644 index 0000000..8ba1b01 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java @@ -0,0 +1,690 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.network; + +import java.io.IOException; +import java.io.EOFException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.CancelledKeyException; + +import java.security.Principal; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLEngineResult.HandshakeStatus; +import javax.net.ssl.SSLEngineResult.Status; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLSession; +import javax.net.ssl.SSLPeerUnverifiedException; + +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * Transport layer for SSL communication + */ + +public class SSLTransportLayer implements TransportLayer { + private static final Logger log = LoggerFactory.getLogger(SSLTransportLayer.class); + private final String channelId; + protected final SSLEngine sslEngine; + private final SelectionKey key; + private final SocketChannel socketChannel; + private HandshakeStatus handshakeStatus; + private SSLEngineResult handshakeResult; + private boolean handshakeComplete = false; + private boolean closing = false; + private ByteBuffer netReadBuffer; + private ByteBuffer netWriteBuffer; + private ByteBuffer appReadBuffer; + private ByteBuffer emptyBuf = ByteBuffer.allocate(0); + + public SSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException { + this.channelId = channelId; + this.key = key; + this.socketChannel = (SocketChannel) key.channel(); + this.sslEngine = sslEngine; + this.netReadBuffer = ByteBuffer.allocate(packetBufferSize()); + this.netWriteBuffer = ByteBuffer.allocate(packetBufferSize()); + this.appReadBuffer = ByteBuffer.allocate(applicationBufferSize()); + startHandshake(); + } + + /** + * starts sslEngine handshake process + */ + private void startHandshake() throws IOException { + //clear & set netRead & netWrite buffers + netWriteBuffer.position(0); + netWriteBuffer.limit(0); + netReadBuffer.position(0); + netReadBuffer.limit(0); + handshakeComplete = false; + closing = false; + //initiate handshake + sslEngine.beginHandshake(); + handshakeStatus = sslEngine.getHandshakeStatus(); + } + + @Override + public boolean ready() { + return handshakeComplete; + } + + /** + * does socketChannel.finishConnect() + */ + @Override + public void finishConnect() throws IOException { + socketChannel.finishConnect(); + key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); + } + + /** + * disconnects selectionKey. + */ + @Override + public void disconnect() { + key.cancel(); + } + + @Override + public SocketChannel socketChannel() { + return socketChannel; + } + + @Override + public boolean isOpen() { + return socketChannel.isOpen(); + } + + @Override + public boolean isConnected() { + return socketChannel.isConnected(); + } + + + /** + * Sends a SSL close message and closes socketChannel. + * @throws IOException if an I/O error occurs + * @throws IOException if there is data on the outgoing network buffer and we are unable to flush it + */ + @Override + public void close() throws IOException { + if (closing) return; + closing = true; + sslEngine.closeOutbound(); + try { + if (!flush(netWriteBuffer)) { + throw new IOException("Remaining data in the network buffer, can't send SSL close message."); + } + //prep the buffer for the close message + netWriteBuffer.clear(); + //perform the close, since we called sslEngine.closeOutbound + SSLEngineResult handshake = sslEngine.wrap(emptyBuf, netWriteBuffer); + //we should be in a close state + if (handshake.getStatus() != SSLEngineResult.Status.CLOSED) { + throw new IOException("Invalid close state, will not send network data."); + } + netWriteBuffer.flip(); + flush(netWriteBuffer); + socketChannel.socket().close(); + socketChannel.close(); + } catch (IOException ie) { + log.warn("Failed to send SSL Close message ", ie); + } + key.attach(null); + key.cancel(); + } + + /** + * returns true if there are any pending contents in netWriteBuffer + */ + @Override + public boolean hasPendingWrites() { + return netWriteBuffer.hasRemaining(); + } + + /** + * Flushes the buffer to the network, non blocking + * @param buf ByteBuffer + * @return boolean true if the buffer has been emptied out, false otherwise + * @throws IOException + */ + private boolean flush(ByteBuffer buf) throws IOException { + int remaining = buf.remaining(); + if (remaining > 0) { + int written = socketChannel.write(buf); + return written >= remaining; + } + return true; + } + + /** + * Performs SSL handshake, non blocking. + * Before application data (kafka protocols) can be sent client & kafka broker must + * perform ssl handshake. + * During the handshake SSLEngine generates encrypted data that will be transported over socketChannel. + * Each SSLEngine operation generates SSLEngineResult , of which SSLEngineResult.handshakeStatus field is used to + * determine what operation needs to occur to move handshake along. + * A typical handshake might look like this. + * +-------------+----------------------------------+-------------+ + * | client | SSL/TLS message | HSStatus | + * +-------------+----------------------------------+-------------+ + * | wrap() | ClientHello | NEED_UNWRAP | + * | unwrap() | ServerHello/Cert/ServerHelloDone | NEED_WRAP | + * | wrap() | ClientKeyExchange | NEED_WRAP | + * | wrap() | ChangeCipherSpec | NEED_WRAP | + * | wrap() | Finished | NEED_UNWRAP | + * | unwrap() | ChangeCipherSpec | NEED_UNWRAP | + * | unwrap() | Finished | FINISHED | + * +-------------+----------------------------------+-------------+ + * + * @throws IOException + */ + @Override + public void handshake() throws IOException { + boolean read = key.isReadable(); + boolean write = key.isWritable(); + handshakeComplete = false; + handshakeStatus = sslEngine.getHandshakeStatus(); + if (!flush(netWriteBuffer)) { + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + return; + } + try { + switch (handshakeStatus) { + case NEED_TASK: + log.trace("SSLHandshake NEED_TASK channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}", + channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position()); + handshakeStatus = runDelegatedTasks(); + break; + case NEED_WRAP: + log.trace("SSLHandshake NEED_WRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}", + channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position()); + handshakeResult = handshakeWrap(write); + if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) { + int currentPacketBufferSize = packetBufferSize(); + netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentPacketBufferSize); + if (netWriteBuffer.position() >= currentPacketBufferSize) { + throw new IllegalStateException("Buffer overflow when available data size (" + netWriteBuffer.position() + + ") >= network buffer size (" + currentPacketBufferSize + ")"); + } + } else if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) { + throw new IllegalStateException("Should not have received BUFFER_UNDERFLOW during handshake WRAP."); + } else if (handshakeResult.getStatus() == Status.CLOSED) { + throw new EOFException(); + } + log.trace("SSLHandshake NEED_WRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}", + channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position()); + //if handshake status is not NEED_UNWRAP or unable to flush netWriteBuffer contents + //we will break here otherwise we can do need_unwrap in the same call. + if (handshakeStatus != HandshakeStatus.NEED_UNWRAP || !flush(netWriteBuffer)) { + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + break; + } + case NEED_UNWRAP: + log.trace("SSLHandshake NEED_UNWRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}", + channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position()); + handshakeResult = handshakeUnwrap(read); + if (handshakeResult.getStatus() == Status.BUFFER_UNDERFLOW) { + int currentPacketBufferSize = packetBufferSize(); + netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentPacketBufferSize); + if (netReadBuffer.position() >= currentPacketBufferSize) { + throw new IllegalStateException("Buffer underflow when there is available data"); + } + } else if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) { + int currentAppBufferSize = applicationBufferSize(); + appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentAppBufferSize); + if (appReadBuffer.position() > currentAppBufferSize) { + throw new IllegalStateException("Buffer underflow when available data size (" + appReadBuffer.position() + + ") > packet buffer size (" + currentAppBufferSize + ")"); + } + } else if (handshakeResult.getStatus() == Status.CLOSED) { + throw new EOFException("SSL handshake status CLOSED during handshake UNWRAP"); + } + log.trace("SSLHandshake NEED_UNWRAP channelId {}, handshakeResult {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}", + channelId, handshakeResult, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position()); + + //if handshakeStatus completed than fall-through to finished status. + //after handshake is finished there is no data left to read/write in socketChannel. + //so the selector won't invoke this channel if we don't go through the handshakeFinished here. + if (handshakeStatus != HandshakeStatus.FINISHED) { + if (handshakeStatus == HandshakeStatus.NEED_WRAP) { + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + } else if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) { + key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); + } + break; + } + case FINISHED: + handshakeFinished(); + break; + case NOT_HANDSHAKING: + handshakeFinished(); + break; + default: + throw new IllegalStateException(String.format("Unexpected status [%s]", handshakeStatus)); + } + } catch (SSLException e) { + handshakeFailure(); + throw e; + } + } + + + /** + * Executes the SSLEngine tasks needed. + * @return HandshakeStatus + */ + private HandshakeStatus runDelegatedTasks() { + for (;;) { + Runnable task = delegatedTask(); + if (task == null) { + break; + } + task.run(); + } + return sslEngine.getHandshakeStatus(); + } + + /** + * Checks if the handshake status is finished + * Sets the interestOps for the selectionKey. + */ + private void handshakeFinished() throws IOException { + // SSLEnginge.getHandshakeStatus is transient and it doesn't record FINISHED status properly. + // It can move from FINISHED status to NOT_HANDSHAKING after the handshake is completed. + // Hence we also need to check handshakeResult.getHandshakeStatus() if the handshake finished or not + if (handshakeResult.getHandshakeStatus() == HandshakeStatus.FINISHED) { + //we are complete if we have delivered the last package + handshakeComplete = !netWriteBuffer.hasRemaining(); + //remove OP_WRITE if we are complete, otherwise we still have data to write + if (!handshakeComplete) + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + else + key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); + + log.trace("SSLHandshake FINISHED channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {} ", + channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position()); + } else { + throw new IOException("NOT_HANDSHAKING during handshake"); + } + } + + /** + * Performs the WRAP function + * @param doWrite boolean + * @return SSLEngineResult + * @throws IOException + */ + private SSLEngineResult handshakeWrap(Boolean doWrite) throws IOException { + log.trace("SSLHandshake handshakeWrap", channelId); + if (netWriteBuffer.hasRemaining()) + throw new IllegalStateException("handshakeWrap called with netWriteBuffer not empty"); + //this should never be called with a network buffer that contains data + //so we can clear it here. + netWriteBuffer.clear(); + SSLEngineResult result = sslEngine.wrap(emptyBuf, netWriteBuffer); + //prepare the results to be written + netWriteBuffer.flip(); + handshakeStatus = result.getHandshakeStatus(); + if (result.getStatus() == SSLEngineResult.Status.OK && + result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { + handshakeStatus = runDelegatedTasks(); + } + + if (doWrite) flush(netWriteBuffer); + return result; + } + + /** + * Perform handshake unwrap + * @param doRead boolean + * @return SSLEngineResult + * @throws IOException + */ + private SSLEngineResult handshakeUnwrap(Boolean doRead) throws IOException { + log.trace("SSLHandshake handshakeUnwrap", channelId); + SSLEngineResult result; + boolean cont = false; + int read = 0; + if (doRead) { + read = socketChannel.read(netReadBuffer); + if (read == -1) throw new EOFException("EOF during handshake."); + } + do { + //prepare the buffer with the incoming data + netReadBuffer.flip(); + result = sslEngine.unwrap(netReadBuffer, appReadBuffer); + netReadBuffer.compact(); + handshakeStatus = result.getHandshakeStatus(); + if (result.getStatus() == SSLEngineResult.Status.OK && + result.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { + handshakeStatus = runDelegatedTasks(); + } + cont = result.getStatus() == SSLEngineResult.Status.OK && + handshakeStatus == HandshakeStatus.NEED_UNWRAP; + log.trace("SSLHandshake handshakeUnwrap: handshakeStatus ", handshakeStatus); + } while (netReadBuffer.position() != 0 && cont); + + return result; + } + + + /** + * Reads a sequence of bytes from this channel into the given buffer. + * + * @param dst The buffer into which bytes are to be transferred + * @return The number of bytes read, possible zero or -1 if the channel has reached end-of-stream + * @throws IOException if some other I/O error occurs + */ + @Override + public int read(ByteBuffer dst) throws IOException { + if (closing) return -1; + int read = 0; + if (!handshakeComplete) return read; + + //if we have unread decrypted data in appReadBuffer read that into dst buffer. + if (appReadBuffer.position() > 0) { + read = readFromAppBuffer(dst); + } + + if (dst.remaining() > 0) { + netReadBuffer = Utils.ensureCapacity(netReadBuffer, packetBufferSize()); + if (netReadBuffer.remaining() > 0) { + int netread = socketChannel.read(netReadBuffer); + if (netread == 0) return netread; + else if (netread < 0) throw new EOFException("EOF during read"); + } + do { + netReadBuffer.flip(); + SSLEngineResult unwrapResult = sslEngine.unwrap(netReadBuffer, appReadBuffer); + netReadBuffer.compact(); + // handle ssl renegotiation. + if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) { + log.trace("SSLChannel Read begin renegotiation channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}", + channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position()); + handshake(); + break; + } + + if (unwrapResult.getStatus() == Status.OK) { + read += readFromAppBuffer(dst); + } else if (unwrapResult.getStatus() == Status.BUFFER_OVERFLOW) { + int currentApplicationBufferSize = applicationBufferSize(); + appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentApplicationBufferSize); + if (appReadBuffer.position() >= currentApplicationBufferSize) { + throw new IllegalStateException("Buffer overflow when available data size (" + appReadBuffer.position() + + ") >= application buffer size (" + currentApplicationBufferSize + ")"); + } + + // appReadBuffer will extended upto currentApplicationBufferSize + // we need to read the existing content into dst before we can do unwrap again. If there are no space in dst + // we can break here. + if (dst.hasRemaining()) + read += readFromAppBuffer(dst); + else + break; + } else if (unwrapResult.getStatus() == Status.BUFFER_UNDERFLOW) { + int currentPacketBufferSize = packetBufferSize(); + netReadBuffer = Utils.ensureCapacity(netReadBuffer, currentPacketBufferSize); + if (netReadBuffer.position() >= currentPacketBufferSize) { + throw new IllegalStateException("Buffer underflow when available data size (" + netReadBuffer.position() + + ") > packet buffer size (" + currentPacketBufferSize + ")"); + } + break; + } else if (unwrapResult.getStatus() == Status.CLOSED) { + throw new EOFException(); + } + } while (netReadBuffer.position() != 0); + } + return read; + } + + + /** + * Reads a sequence of bytes from this channel into the given buffers. + * + * @param dsts - The buffers into which bytes are to be transferred. + * @return The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream. + * @throws IOException if some other I/O error occurs + */ + @Override + public long read(ByteBuffer[] dsts) throws IOException { + return read(dsts, 0, dsts.length); + } + + + /** + * Reads a sequence of bytes from this channel into a subsequence of the given buffers. + * @param dsts - The buffers into which bytes are to be transferred + * @param offset - The offset within the buffer array of the first buffer into which bytes are to be transferred; must be non-negative and no larger than dsts.length. + * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than dsts.length - offset + * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream. + * @throws IOException if some other I/O error occurs + */ + @Override + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + if ((offset < 0) || (length < 0) || (offset > dsts.length - length)) + throw new IndexOutOfBoundsException(); + + int totalRead = 0; + int i = offset; + while (i < length) { + if (dsts[i].hasRemaining()) { + int read = read(dsts[i]); + if (read > 0) + totalRead += read; + else + break; + } + if (!dsts[i].hasRemaining()) { + i++; + } + } + return totalRead; + } + + + /** + * Writes a sequence of bytes to this channel from the given buffer. + * + * @param src The buffer from which bytes are to be retrieved + * @returns The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream + * @throws IOException If some other I/O error occurs + */ + @Override + public int write(ByteBuffer src) throws IOException { + int written = 0; + if (closing) throw new IllegalStateException("Channel is in closing state"); + if (!handshakeComplete) return written; + + if (!flush(netWriteBuffer)) + return written; + + netWriteBuffer.clear(); + SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer); + netWriteBuffer.flip(); + + //handle ssl renegotiation + if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) { + handshake(); + return written; + } + + if (wrapResult.getStatus() == Status.OK) { + written = wrapResult.bytesConsumed(); + flush(netWriteBuffer); + } else if (wrapResult.getStatus() == Status.BUFFER_OVERFLOW) { + int currentPacketBufferSize = packetBufferSize(); + netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, packetBufferSize()); + if (netWriteBuffer.position() >= currentPacketBufferSize) + throw new IllegalStateException("SSL BUFFER_OVERFLOW when available data size (" + netWriteBuffer.position() + ") >= network buffer size (" + currentPacketBufferSize + ")"); + } else if (wrapResult.getStatus() == Status.BUFFER_UNDERFLOW) { + throw new IllegalStateException("SSL BUFFER_UNDERFLOW during write"); + } else if (wrapResult.getStatus() == Status.CLOSED) { + throw new EOFException(); + } + return written; + } + + /** + * Writes a sequence of bytes to this channel from the subsequence of the given buffers. + * + * @param srcs The buffers from which bytes are to be retrieved + * @param offset The offset within the buffer array of the first buffer from which bytes are to be retrieved; must be non-negative and no larger than srcs.length. + * @param length - The maximum number of buffers to be accessed; must be non-negative and no larger than srcs.length - offset. + * @return returns no.of bytes written , possibly zero. + * @throws IOException If some other I/O error occurs + */ + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + if ((offset < 0) || (length < 0) || (offset > srcs.length - length)) + throw new IndexOutOfBoundsException(); + int totalWritten = 0; + int i = offset; + while (i < length) { + if (srcs[i].hasRemaining() || hasPendingWrites()) { + int written = write(srcs[i]); + if (written > 0) { + totalWritten += written; + } + } + if (!srcs[i].hasRemaining() && !hasPendingWrites()) { + i++; + } else { + // if we are unable to write the current buffer to socketChannel we should break, + // as we might have reached max socket send buffer size. + break; + } + } + return totalWritten; + } + + /** + * Writes a sequence of bytes to this channel from the given buffers. + * + * @param srcs The buffers from which bytes are to be retrieved + * @return returns no.of bytes consumed by SSLEngine.wrap , possibly zero. + * @throws IOException If some other I/O error occurs + */ + @Override + public long write(ByteBuffer[] srcs) throws IOException { + return write(srcs, 0, srcs.length); + } + + + /** + * SSLSession's peerPrincipal for the remote host. + * @return Principal + */ + public Principal peerPrincipal() throws IOException { + try { + return sslEngine.getSession().getPeerPrincipal(); + } catch (SSLPeerUnverifiedException se) { + throw new IOException(String.format("Unable to retrieve getPeerPrincipal due to %s", se)); + } + } + + /** + * returns a SSL Session after the handshake is established + * throws IllegalStateException if the handshake is not established + */ + public SSLSession sslSession() throws IllegalStateException { + return sslEngine.getSession(); + } + + /** + * Adds interestOps to SelectionKey of the TransportLayer + * @param ops SelectionKey interestOps + */ + @Override + public void addInterestOps(int ops) { + if (!key.isValid()) + throw new CancelledKeyException(); + else if (!handshakeComplete) + throw new IllegalStateException("handshake is not completed"); + + key.interestOps(key.interestOps() | ops); + } + + /** + * removes interestOps to SelectionKey of the TransportLayer + * @param ops SelectionKey interestOps + */ + @Override + public void removeInterestOps(int ops) { + if (!key.isValid()) + throw new CancelledKeyException(); + else if (!handshakeComplete) + throw new IllegalStateException("handshake is not completed"); + + key.interestOps(key.interestOps() & ~ops); + } + + + /** + * returns delegatedTask for the SSLEngine. + */ + protected Runnable delegatedTask() { + return sslEngine.getDelegatedTask(); + } + + /** + * transfers appReadBuffer contents (decrypted data) into dst bytebuffer + * @param dst ByteBuffer + */ + private int readFromAppBuffer(ByteBuffer dst) { + appReadBuffer.flip(); + int remaining = Math.min(appReadBuffer.remaining(), dst.remaining()); + if (remaining > 0) { + int limit = appReadBuffer.limit(); + appReadBuffer.limit(appReadBuffer.position() + remaining); + dst.put(appReadBuffer); + appReadBuffer.limit(limit); + } + appReadBuffer.compact(); + return remaining; + } + + private int packetBufferSize() { + return sslEngine.getSession().getPacketBufferSize(); + } + + private int applicationBufferSize() { + return sslEngine.getSession().getApplicationBufferSize(); + } + + private void handshakeFailure() { + //Release all resources such as internal buffers that SSLEngine is managing + sslEngine.closeOutbound(); + try { + sslEngine.closeInbound(); + } catch (SSLException e) { + log.debug("SSLEngine.closeInBound() raised an exception.", e); + } + } + + @Override + public boolean isMute() { + return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) == 0; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/Selectable.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java index 618a0fa..39eae4a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java @@ -3,15 +3,16 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.network; + import java.io.IOException; import java.net.InetSocketAddress; import java.util.List; @@ -103,4 +104,9 @@ public interface Selectable { */ public void unmuteAll(); -} \ No newline at end of file + /** + * returns true if a channel is ready + * @param id The id for the connection + */ + public boolean isChannelReady(String id); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/Selector.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index ce20111..12c911c 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -43,26 +43,26 @@ import org.slf4j.LoggerFactory; * responses. * <p> * A connection can be added to the nioSelector associated with an integer id by doing - * + * * <pre> * nioSelector.connect("42", new InetSocketAddress("google.com", server.port), 64000, 64000); * </pre> - * + * * The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating * the connection. The successful invocation of this method does not mean a valid connection has been established. - * + * * Sending requests, receiving responses, processing connection completions, and disconnections on the existing * connections are all done using the <code>poll()</code> call. - * + * * <pre> * nioSelector.send(new NetworkSend(myDestination, myBytes)); * nioSelector.send(new NetworkSend(myOtherDestination, myOtherBytes)); * nioSelector.poll(TIMEOUT_MS); * </pre> - * + * * The nioSelector maintains several lists that are reset by each call to <code>poll()</code> which are available via * various getters. These are reset by each call to <code>poll()</code>. - * + * * This class is not thread safe! */ public class Selector implements Selectable { @@ -70,9 +70,10 @@ public class Selector implements Selectable { private static final Logger log = LoggerFactory.getLogger(Selector.class); private final java.nio.channels.Selector nioSelector; - private final Map<String, SelectionKey> keys; + private final Map<String, KafkaChannel> channels; private final List<Send> completedSends; private final List<NetworkReceive> completedReceives; + private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives; private final List<String> disconnected; private final List<String> connected; private final List<String> failedSends; @@ -80,6 +81,7 @@ public class Selector implements Selectable { private final SelectorMetrics sensors; private final String metricGrpPrefix; private final Map<String, String> metricTags; + private final ChannelBuilder channelBuilder; private final Map<String, Long> lruConnections; private final long connectionsMaxIdleNanos; private final int maxReceiveSize; @@ -91,7 +93,7 @@ public class Selector implements Selectable { /** * Create a new nioSelector */ - public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) { + public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder) { try { this.nioSelector = java.nio.channels.Selector.open(); } catch (IOException e) { @@ -102,13 +104,15 @@ public class Selector implements Selectable { this.time = time; this.metricGrpPrefix = metricGrpPrefix; this.metricTags = metricTags; - this.keys = new HashMap<String, SelectionKey>(); + this.channels = new HashMap<String, KafkaChannel>(); this.completedSends = new ArrayList<Send>(); this.completedReceives = new ArrayList<NetworkReceive>(); + this.stagedReceives = new HashMap<KafkaChannel, Deque<NetworkReceive>>(); this.connected = new ArrayList<String>(); this.disconnected = new ArrayList<String>(); this.failedSends = new ArrayList<String>(); this.sensors = new SelectorMetrics(metrics); + this.channelBuilder = channelBuilder; // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true this.lruConnections = new LinkedHashMap<String, Long>(16, .75F, true); currentTimeNanos = new SystemTime().nanoseconds(); @@ -116,8 +120,8 @@ public class Selector implements Selectable { this.metricsPerConnection = metricsPerConnection; } - public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) { - this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true); + public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, ChannelBuilder channelBuilder) { + this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true, channelBuilder); } /** @@ -135,28 +139,29 @@ public class Selector implements Selectable { */ @Override public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { - if (this.keys.containsKey(id)) + if (this.channels.containsKey(id)) throw new IllegalStateException("There is already a connection for id " + id); - SocketChannel channel = SocketChannel.open(); - channel.configureBlocking(false); - Socket socket = channel.socket(); + SocketChannel socketChannel = SocketChannel.open(); + socketChannel.configureBlocking(false); + Socket socket = socketChannel.socket(); socket.setKeepAlive(true); socket.setSendBufferSize(sendBufferSize); socket.setReceiveBufferSize(receiveBufferSize); socket.setTcpNoDelay(true); try { - channel.connect(address); + socketChannel.connect(address); } catch (UnresolvedAddressException e) { - channel.close(); + socketChannel.close(); throw new IOException("Can't resolve address: " + address, e); } catch (IOException e) { - channel.close(); + socketChannel.close(); throw e; } - SelectionKey key = channel.register(this.nioSelector, SelectionKey.OP_CONNECT); - key.attach(new Transmissions(id)); - this.keys.put(id, key); + SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT); + KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); + key.attach(channel); + this.channels.put(id, channel); } /** @@ -164,10 +169,11 @@ public class Selector implements Selectable { * Use this on server-side, when a connection is accepted by a different thread but processed by the Selector * Note that we are not checking if the connection id is valid - since the connection already exists */ - public void register(String id, SocketChannel channel) throws ClosedChannelException { - SelectionKey key = channel.register(nioSelector, SelectionKey.OP_READ); - key.attach(new Transmissions(id)); - this.keys.put(id, key); + public void register(String id, SocketChannel socketChannel) throws ClosedChannelException { + SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ); + KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); + key.attach(channel); + this.channels.put(id, channel); } /** @@ -176,9 +182,9 @@ public class Selector implements Selectable { */ @Override public void disconnect(String id) { - SelectionKey key = this.keys.get(id); - if (key != null) - key.cancel(); + KafkaChannel channel = channelForId(id); + if (channel != null) + channel.disconnect(); } /** @@ -194,14 +200,15 @@ public class Selector implements Selectable { */ @Override public void close() { - List<String> connections = new LinkedList<String>(keys.keySet()); + List<String> connections = new LinkedList<String>(channels.keySet()); for (String id: connections) close(id); - try { this.nioSelector.close(); } catch (IOException e) { log.error("Exception closing nioSelector:", e); + } catch (SecurityException se) { + log.error("Exception closing nioSelector:", se); } } @@ -210,28 +217,38 @@ public class Selector implements Selectable { * @param send The request to send */ public void send(Send send) { - SelectionKey key = keyForId(send.destination()); - Transmissions transmissions = transmissions(key); - if (transmissions.hasSend()) - throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); - transmissions.send = send; + KafkaChannel channel = channelForId(send.destination()); + if (channel == null) + throw new IllegalStateException("channel is not connected"); + try { - key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + channel.setSend(send); } catch (CancelledKeyException e) { - close(transmissions.id); this.failedSends.add(send.destination()); + close(channel); } } /** * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing * disconnections, initiating new sends, or making progress on in-progress sends or receives. - * + * * When this call is completed the user can check for completed sends, receives, connections or disconnects using * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These * lists will be cleared at the beginning of each {@link #poll(long)} call and repopulated by the call if there is * any completed I/O. - * + * + * In the "Plaintext" setting, we are using socketChannel to read & write to the network. But for the "SSL" setting, + * we encrypt the data before we use socketChannel to write data to the network, and decrypt before we return the responses. + * This requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrpyted + * we won't be able to read exact no.of bytes as kafka protocol requires. We read as many bytes as we can, up to SSLEngine's + * application buffer size. This means we might be reading additional bytes than the requested size. + * If there is no further data to read from socketChannel selector won't invoke that channel and we've have additional bytes + * in the buffer. To overcome this issue we added "stagedReceives" map which contains per-channel deque. When we are + * reading a channel we read as many responses as we can and store them into "stagedReceives" and pop one response during + * the poll to add the completedReceives. If there are any active channels in the "stagedReceives" we set "timeout" to 0 + * and pop response and add to the completedReceives. + * * @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely. * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is * already an in-progress send @@ -239,7 +256,8 @@ public class Selector implements Selectable { @Override public void poll(long timeout) throws IOException { clear(); - + if (hasStagedReceives()) + timeout = 0; /* check ready keys */ long startSelect = time.nanoseconds(); int readyKeys = select(timeout); @@ -253,85 +271,73 @@ public class Selector implements Selectable { while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); - - Transmissions transmissions = transmissions(key); - SocketChannel channel = channel(key); + KafkaChannel channel = channel(key); // register all per-connection metrics at once - sensors.maybeRegisterConnectionMetrics(transmissions.id); - lruConnections.put(transmissions.id, currentTimeNanos); + sensors.maybeRegisterConnectionMetrics(channel.id()); + lruConnections.put(channel.id(), currentTimeNanos); try { /* complete any connections that have finished their handshake */ if (key.isConnectable()) { channel.finishConnect(); - key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); - this.connected.add(transmissions.id); + this.connected.add(channel.id()); this.sensors.connectionCreated.record(); - log.debug("Connection {} created", transmissions.id); } - /* read from any connections that have readable data */ - if (key.isReadable()) { - if (!transmissions.hasReceive()) - transmissions.receive = new NetworkReceive(maxReceiveSize, transmissions.id); + /* if channel is not ready finish prepare */ + if (channel.isConnected() && !channel.ready()) + channel.prepare(); + + /* if channel is ready read from any connections that have readable data */ + if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { + NetworkReceive networkReceive; try { - transmissions.receive.readFrom(channel); + while ((networkReceive = channel.read()) != null) { + addToStagedReceives(channel, networkReceive); + } } catch (InvalidReceiveException e) { - log.error("Invalid data received from " + transmissions.id + " closing connection", e); - close(transmissions.id); - this.disconnected.add(transmissions.id); + log.error("Invalid data received from " + channel.id() + " closing connection", e); + close(channel); + this.disconnected.add(channel.id()); throw e; } - if (transmissions.receive.complete()) { - transmissions.receive.payload().rewind(); - this.completedReceives.add(transmissions.receive); - this.sensors.recordBytesReceived(transmissions.id, transmissions.receive.payload().limit()); - transmissions.clearReceive(); - } } - /* write to any sockets that have space in their buffer and for which we have data */ - if (key.isWritable()) { - transmissions.send.writeTo(channel); - if (transmissions.send.completed()) { - this.completedSends.add(transmissions.send); - this.sensors.recordBytesSent(transmissions.id, transmissions.send.size()); - transmissions.clearSend(); - key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); + /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ + if (channel.ready() && key.isWritable()) { + Send send = channel.write(); + if (send != null) { + this.completedSends.add(send); + this.sensors.recordBytesSent(channel.id(), send.size()); } } /* cancel any defunct sockets */ if (!key.isValid()) { - close(transmissions.id); - this.disconnected.add(transmissions.id); + close(channel); + this.disconnected.add(channel.id()); } } catch (IOException e) { - String desc = socketDescription(channel); + String desc = channel.socketDescription(); if (e instanceof EOFException || e instanceof ConnectException) log.debug("Connection {} disconnected", desc); else log.warn("Error in I/O with connection to {}", desc, e); - close(transmissions.id); - this.disconnected.add(transmissions.id); + close(channel); + this.disconnected.add(channel.id()); } } } + + addToCompletedReceives(); + long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); maybeCloseOldestConnection(); } - private String socketDescription(SocketChannel channel) { - Socket socket = channel.socket(); - if (socket == null) - return "[unconnected socket]"; - else if (socket.getInetAddress() != null) - return socket.getInetAddress().toString(); - else - return socket.getLocalAddress().toString(); - } + @Override public List<Send> completedSends() { @@ -355,32 +361,34 @@ public class Selector implements Selectable { @Override public void mute(String id) { - mute(this.keyForId(id)); + KafkaChannel channel = channelForId(id); + mute(channel); } - private void mute(SelectionKey key) { - key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); + private void mute(KafkaChannel channel) { + channel.mute(); } @Override public void unmute(String id) { - unmute(this.keyForId(id)); + KafkaChannel channel = channelForId(id); + unmute(channel); } - private void unmute(SelectionKey key) { - key.interestOps(key.interestOps() | SelectionKey.OP_READ); + private void unmute(KafkaChannel channel) { + channel.unmute(); } @Override public void muteAll() { - for (SelectionKey key : this.keys.values()) - mute(key); + for (KafkaChannel channel : this.channels.values()) + mute(channel); } @Override public void unmuteAll() { - for (SelectionKey key : this.keys.values()) - unmute(key); + for (KafkaChannel channel : this.channels.values()) + unmute(channel); } private void maybeCloseOldestConnection() { @@ -418,7 +426,7 @@ public class Selector implements Selectable { /** * Check for data, waiting up to the given timeout. - * + * * @param ms Length of time to wait, in milliseconds. If negative, wait indefinitely. * @return The number of keys ready * @throws IOException @@ -434,81 +442,107 @@ public class Selector implements Selectable { /** * Begin closing this connection + * @param id channel id */ public void close(String id) { - SelectionKey key = keyForId(id); - lruConnections.remove(id); - SocketChannel channel = channel(key); - Transmissions trans = transmissions(key); - if (trans != null) { - this.keys.remove(trans.id); - trans.clearReceive(); - trans.clearSend(); - } - key.attach(null); - key.cancel(); + KafkaChannel channel = this.channels.get(id); + if (channel != null) + close(channel); + } + + /** + * Begin closing this connection + */ + private void close(KafkaChannel channel) { try { - channel.socket().close(); channel.close(); } catch (IOException e) { - log.error("Exception closing connection to node {}:", trans.id, e); + log.error("Exception closing connection to node {}:", channel.id(), e); } + this.stagedReceives.remove(channel); + this.channels.remove(channel.id()); + this.lruConnections.remove(channel.id()); this.sensors.connectionClosed.record(); } /** - * Get the selection key associated with this numeric id + * check if channel is ready */ - private SelectionKey keyForId(String id) { - SelectionKey key = this.keys.get(id); - if (key == null) - throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + keys.keySet().toString()); - return key; + @Override + public boolean isChannelReady(String id) { + KafkaChannel channel = channelForId(id); + return channel.ready(); } /** - * Get the transmissions for the given connection + * Get the channel associated with this numeric id */ - private Transmissions transmissions(SelectionKey key) { - return (Transmissions) key.attachment(); + private KafkaChannel channelForId(String id) { + KafkaChannel channel = this.channels.get(id); + if (channel == null) + throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + channels.keySet().toString()); + return channel; } /** - * Get the socket channel associated with this selection key + * Get the channel associated with selectionKey */ - private SocketChannel channel(SelectionKey key) { - return (SocketChannel) key.channel(); + private KafkaChannel channel(SelectionKey key) { + return (KafkaChannel) key.attachment(); } /** - * The id and in-progress send and receive associated with a connection + * Check if given channel has a staged receive */ - private static class Transmissions { - public String id; - public Send send; - public NetworkReceive receive; + private boolean hasStagedReceive(KafkaChannel channel) { + return stagedReceives.containsKey(channel); + } - public Transmissions(String id) { - this.id = id; + /** + * check if stagedReceives have unmuted channel + */ + private boolean hasStagedReceives() { + for (KafkaChannel channel : this.stagedReceives.keySet()) { + if (!channel.isMute()) + return true; } + return false; + } - public boolean hasSend() { - return this.send != null; - } - public void clearSend() { - this.send = null; - } + /** + * adds a receive to staged receieves + */ + private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) { + if (!stagedReceives.containsKey(channel)) + stagedReceives.put(channel, new ArrayDeque<NetworkReceive>()); - public boolean hasReceive() { - return this.receive != null; - } + Deque<NetworkReceive> deque = stagedReceives.get(channel); + deque.add(receive); + } - public void clearReceive() { - this.receive = null; + /** + * checks if there are any staged receives and adds to completedReceives + */ + private void addToCompletedReceives() { + if (this.stagedReceives.size() > 0) { + Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next(); + KafkaChannel channel = entry.getKey(); + if (!channel.isMute()) { + Deque<NetworkReceive> deque = entry.getValue(); + NetworkReceive networkReceive = deque.poll(); + this.completedReceives.add(networkReceive); + this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit()); + if (deque.size() == 0) + iter.remove(); + } + } } } + private class SelectorMetrics { private final Metrics metrics; public final Sensor connectionClosed; @@ -575,7 +609,7 @@ public class Selector implements Selectable { metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", metricTags); this.metrics.addMetric(metricName, new Measurable() { public double measure(MetricConfig config, long now) { - return keys.size(); + return channels.size(); } }); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/Send.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Send.java b/clients/src/main/java/org/apache/kafka/common/network/Send.java index 8f6daad..e0d8831 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Send.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Send.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -33,7 +33,7 @@ public interface Send { /** * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send * to be completely written - * @param channel The channel to write to + * @param channel The Channel to write to * @return The number of bytes written * @throws IOException If the write fails */ http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java new file mode 100644 index 0000000..e9158aa --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.network; + +/* + * Transport layer for underlying communication. + * At very basic level it is wrapper around SocketChannel and can be used as substitue for SocketChannel + * and other network Channel implementations. + * As NetworkClient replaces BlockingChannel and other implementations we will be using KafkaChannel as + * a network I/O channel. + */ +import java.io.IOException; +import java.nio.channels.SocketChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.channels.GatheringByteChannel; + +import java.security.Principal; + + +public interface TransportLayer extends ScatteringByteChannel, GatheringByteChannel { + + /** + * Returns true if the channel has handshake and authentication done. + */ + boolean ready(); + + /** + * Finishes the process of connecting a socket channel. + */ + void finishConnect() throws IOException; + + /** + * disconnect socketChannel + */ + void disconnect(); + + /** + * Tells whether or not this channel's network socket is connected. + */ + boolean isConnected(); + + /** + * returns underlying socketChannel + */ + SocketChannel socketChannel(); + + + /** + * Performs SSL handshake hence is a no-op for the non-secure + * implementation + * @throws IOException + */ + void handshake() throws IOException; + + /** + * Returns true if there are any pending writes + */ + boolean hasPendingWrites(); + + /** + * returns SSLSession.getPeerPrinicpal if SSLTransportLayer used + * for non-secure returns a "ANONYMOUS" as the peerPrincipal + */ + Principal peerPrincipal() throws IOException; + + void addInterestOps(int ops); + + void removeInterestOps(int ops); + + boolean isMute(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java index dab1a94..a624741 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java @@ -24,6 +24,8 @@ import java.util.Map; public enum SecurityProtocol { /** Un-authenticated, non-encrypted channel */ PLAINTEXT(0, "PLAINTEXT"), + /** SSL channel */ + SSL(1, "SSL"), /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */ TRACE(Short.MAX_VALUE, "TRACE"); http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java new file mode 100644 index 0000000..fbbeb9e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.auth; + +import java.util.Map; +import java.security.Principal; + +import org.apache.kafka.common.network.TransportLayer; +import org.apache.kafka.common.network.Authenticator; +import org.apache.kafka.common.KafkaException; + +/** DefaultPrincipalBuilder which return transportLayer's peer Principal **/ + +public class DefaultPrincipalBuilder implements PrincipalBuilder { + + public void configure(Map<String, ?> configs) {} + + public Principal buildPrincipal(TransportLayer transportLayer, Authenticator authenticator) throws KafkaException { + try { + return transportLayer.peerPrincipal(); + } catch (Exception e) { + throw new KafkaException("Failed to build principal due to: ", e); + } + } + + public void close() throws KafkaException {} + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java new file mode 100644 index 0000000..277b6ef --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.auth; + +import java.security.Principal; + +public class KafkaPrincipal implements Principal { + private final String name; + + public KafkaPrincipal(String name) { + if (name == null) + throw new IllegalArgumentException("name is null"); + this.name = name; + } + + @Override + public boolean equals(Object object) { + if (this == object) + return true; + + if (object instanceof KafkaPrincipal) { + return name.equals(((KafkaPrincipal) object).getName()); + } + + return false; + } + + @Override + public int hashCode() { + return name.hashCode(); + } + + @Override + public String getName() { + return name; + } + + @Override + public String toString() { + return name; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java new file mode 100644 index 0000000..b7cc378 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.auth; + +/* + * PrincipalBuilder for Authenticator + */ + +import org.apache.kafka.common.network.TransportLayer; +import org.apache.kafka.common.network.Authenticator; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Configurable; + +import java.util.Map; +import java.security.Principal; + +public interface PrincipalBuilder extends Configurable { + + /** + * configure this class with give key-value pair + */ + public void configure(Map<String, ?> configs); + + /** + * Returns Principal + * @param TransportLayer + * @param Authenticator + */ + Principal buildPrincipal(TransportLayer transportLayer, Authenticator authenticator) throws KafkaException; + + /** + * Close this PrincipalBuilder + */ + public void close() throws KafkaException; + +}
