http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index d921507..1e561cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.protocolPB; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import com.google.protobuf.CodedInputStream; +import org.apache.hadoop.crypto.CipherOption; +import org.apache.hadoop.crypto.CipherSuite; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -264,4 +266,104 @@ public class PBHelperClient { assert size >= 0; return new ExactSizeInputStream(input, size); } + + public static CipherOption convert(HdfsProtos.CipherOptionProto proto) { + if (proto != null) { + CipherSuite suite = null; + if (proto.getSuite() != null) { + suite = convert(proto.getSuite()); + } + byte[] inKey = null; + if (proto.getInKey() != null) { + inKey = proto.getInKey().toByteArray(); + } + byte[] inIv = null; + if (proto.getInIv() != null) { + inIv = proto.getInIv().toByteArray(); + } + byte[] outKey = null; + if (proto.getOutKey() != null) { + outKey = proto.getOutKey().toByteArray(); + } + byte[] outIv = null; + if (proto.getOutIv() != null) { + outIv = proto.getOutIv().toByteArray(); + } + return new CipherOption(suite, inKey, inIv, outKey, outIv); + } + return null; + } + + public static CipherSuite convert(HdfsProtos.CipherSuiteProto proto) { + switch (proto) { + case AES_CTR_NOPADDING: + return CipherSuite.AES_CTR_NOPADDING; + default: + // Set to UNKNOWN and stash the unknown enum value + CipherSuite suite = CipherSuite.UNKNOWN; + suite.setUnknownValue(proto.getNumber()); + return suite; + } + } + + public static HdfsProtos.CipherOptionProto convert(CipherOption option) { + if (option != null) { + HdfsProtos.CipherOptionProto.Builder builder = HdfsProtos.CipherOptionProto. + newBuilder(); + if (option.getCipherSuite() != null) { + builder.setSuite(convert(option.getCipherSuite())); + } + if (option.getInKey() != null) { + builder.setInKey(ByteString.copyFrom(option.getInKey())); + } + if (option.getInIv() != null) { + builder.setInIv(ByteString.copyFrom(option.getInIv())); + } + if (option.getOutKey() != null) { + builder.setOutKey(ByteString.copyFrom(option.getOutKey())); + } + if (option.getOutIv() != null) { + builder.setOutIv(ByteString.copyFrom(option.getOutIv())); + } + return builder.build(); + } + return null; + } + + public static HdfsProtos.CipherSuiteProto convert(CipherSuite suite) { + switch (suite) { + case UNKNOWN: + return HdfsProtos.CipherSuiteProto.UNKNOWN; + case AES_CTR_NOPADDING: + return HdfsProtos.CipherSuiteProto.AES_CTR_NOPADDING; + default: + return null; + } + } + + public static List<HdfsProtos.CipherOptionProto> convertCipherOptions( + List<CipherOption> options) { + if (options != null) { + List<HdfsProtos.CipherOptionProto> protos = + Lists.newArrayListWithCapacity(options.size()); + for (CipherOption option : options) { + protos.add(convert(option)); + } + return protos; + } + return null; + } + + public static List<CipherOption> convertCipherOptionProtos( + List<HdfsProtos.CipherOptionProto> protos) { + if (protos != null) { + List<CipherOption> options = + Lists.newArrayListWithCapacity(protos.size()); + for (HdfsProtos.CipherOptionProto proto : protos) { + options.add(convert(proto)); + } + return options; + } + return null; + } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 42460ed..ce3fbb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -883,6 +883,9 @@ Release 2.8.0 - UNRELEASED HDFS-8890. Allow admin to specify which blockpools the balancer should run on. (Chris Trezzo via mingma) + HDFS-9002. Move o.a.h.hdfs.net/*Peer classes to hdfs-client. + (Mingliang Liu via wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 268a5b9..95e9ad4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -99,7 +99,6 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; @@ -3018,7 +3017,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, try { sock = socketFactory.createSocket(); NetUtils.connect(sock, addr, getRandomLocalInterfaceAddr(), socketTimeout); - peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this, + peer = DFSUtilClient.peerFromSocketAndKey(saslClient, sock, this, blockToken, datanodeId); peer.setReadTimeout(socketTimeout); peer.setWriteTimeout(socketTimeout); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 6420b55..84858f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -599,14 +599,28 @@ public class DFSConfigKeys extends CommonConfigurationKeys { // Security-related configs public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer"; public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false; - public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY = "dfs.encrypt.data.transfer.cipher.key.bitlength"; - public static final int DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT = 128; - public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = "dfs.encrypt.data.transfer.cipher.suites"; + @Deprecated + public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY = + HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY; + @Deprecated + public static final int DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT = + HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT; + @Deprecated + public static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = + HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm"; - public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class"; - public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection"; - public static final String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = ""; - public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class"; + @Deprecated + public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = + HdfsClientConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS; + @Deprecated + public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = + HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; + @Deprecated + public static final String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = + HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_DEFAULT; + @Deprecated + public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = + HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY; public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100; public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses"; public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI = http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java deleted file mode 100644 index a9f33e7..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.net; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.channels.ReadableByteChannel; - -import org.apache.hadoop.net.unix.DomainSocket; - -/** - * Represents a peer that we communicate with by using a basic Socket - * that has no associated Channel. - * - */ -class BasicInetPeer implements Peer { - private final Socket socket; - private final OutputStream out; - private final InputStream in; - private final boolean isLocal; - - public BasicInetPeer(Socket socket) throws IOException { - this.socket = socket; - this.out = socket.getOutputStream(); - this.in = socket.getInputStream(); - this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress()); - } - - @Override - public ReadableByteChannel getInputStreamChannel() { - /* - * This Socket has no channel, so there's nothing to return here. - */ - return null; - } - - @Override - public void setReadTimeout(int timeoutMs) throws IOException { - socket.setSoTimeout(timeoutMs); - } - - @Override - public int getReceiveBufferSize() throws IOException { - return socket.getReceiveBufferSize(); - } - - @Override - public boolean getTcpNoDelay() throws IOException { - return socket.getTcpNoDelay(); - } - - @Override - public void setWriteTimeout(int timeoutMs) { - /* - * We can't implement write timeouts. :( - * - * Java provides no facility to set a blocking write timeout on a Socket. - * You can simulate a blocking write with a timeout by using - * non-blocking I/O. However, we can't use nio here, because this Socket - * doesn't have an associated Channel. - * - * See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4031100 for - * more details. - */ - } - - @Override - public boolean isClosed() { - return socket.isClosed(); - } - - @Override - public void close() throws IOException { - socket.close(); - } - - @Override - public String getRemoteAddressString() { - return socket.getRemoteSocketAddress().toString(); - } - - @Override - public String getLocalAddressString() { - return socket.getLocalSocketAddress().toString(); - } - - @Override - public InputStream getInputStream() throws IOException { - return in; - } - - @Override - public OutputStream getOutputStream() throws IOException { - return out; - } - - @Override - public boolean isLocal() { - return isLocal; - } - - @Override - public String toString() { - return "BasicInetPeer(" + socket.toString() + ")"; - } - - @Override - public DomainSocket getDomainSocket() { - return null; - } - - @Override - public boolean hasSecureChannel() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java deleted file mode 100644 index da660c7..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.net; - -import java.io.IOException; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.net.unix.DomainSocket; - -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.channels.ReadableByteChannel; - -/** - * Represents a peer that we communicate with by using an encrypted - * communications medium. - */ [email protected] -public class EncryptedPeer implements Peer { - private final Peer enclosedPeer; - - /** - * An encrypted InputStream. - */ - private final InputStream in; - - /** - * An encrypted OutputStream. - */ - private final OutputStream out; - - /** - * An encrypted ReadableByteChannel. - */ - private final ReadableByteChannel channel; - - public EncryptedPeer(Peer enclosedPeer, IOStreamPair ios) { - this.enclosedPeer = enclosedPeer; - this.in = ios.in; - this.out = ios.out; - this.channel = ios.in instanceof ReadableByteChannel ? - (ReadableByteChannel)ios.in : null; - } - - @Override - public ReadableByteChannel getInputStreamChannel() { - return channel; - } - - @Override - public void setReadTimeout(int timeoutMs) throws IOException { - enclosedPeer.setReadTimeout(timeoutMs); - } - - @Override - public int getReceiveBufferSize() throws IOException { - return enclosedPeer.getReceiveBufferSize(); - } - - @Override - public boolean getTcpNoDelay() throws IOException { - return enclosedPeer.getTcpNoDelay(); - } - - @Override - public void setWriteTimeout(int timeoutMs) throws IOException { - enclosedPeer.setWriteTimeout(timeoutMs); - } - - @Override - public boolean isClosed() { - return enclosedPeer.isClosed(); - } - - @Override - public void close() throws IOException { - try { - in.close(); - } finally { - try { - out.close(); - } finally { - enclosedPeer.close(); - } - } - } - - @Override - public String getRemoteAddressString() { - return enclosedPeer.getRemoteAddressString(); - } - - @Override - public String getLocalAddressString() { - return enclosedPeer.getLocalAddressString(); - } - - @Override - public InputStream getInputStream() throws IOException { - return in; - } - - @Override - public OutputStream getOutputStream() throws IOException { - return out; - } - - @Override - public boolean isLocal() { - return enclosedPeer.isLocal(); - } - - @Override - public String toString() { - return "EncryptedPeer(" + enclosedPeer + ")"; - } - - @Override - public DomainSocket getDomainSocket() { - return enclosedPeer.getDomainSocket(); - } - - @Override - public boolean hasSecureChannel() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java deleted file mode 100644 index 5bb4f56..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.net; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.channels.ReadableByteChannel; - -import org.apache.hadoop.net.SocketInputStream; -import org.apache.hadoop.net.SocketOutputStream; -import org.apache.hadoop.net.unix.DomainSocket; - -/** - * Represents a peer that we communicate with by using non-blocking I/O - * on a Socket. - */ -class NioInetPeer implements Peer { - private final Socket socket; - - /** - * An InputStream which simulates blocking I/O with timeouts using NIO. - */ - private final SocketInputStream in; - - /** - * An OutputStream which simulates blocking I/O with timeouts using NIO. - */ - private final SocketOutputStream out; - - private final boolean isLocal; - - NioInetPeer(Socket socket) throws IOException { - this.socket = socket; - this.in = new SocketInputStream(socket.getChannel(), 0); - this.out = new SocketOutputStream(socket.getChannel(), 0); - this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress()); - } - - @Override - public ReadableByteChannel getInputStreamChannel() { - return in; - } - - @Override - public void setReadTimeout(int timeoutMs) throws IOException { - in.setTimeout(timeoutMs); - } - - @Override - public int getReceiveBufferSize() throws IOException { - return socket.getReceiveBufferSize(); - } - - @Override - public boolean getTcpNoDelay() throws IOException { - return socket.getTcpNoDelay(); - } - - @Override - public void setWriteTimeout(int timeoutMs) throws IOException { - out.setTimeout(timeoutMs); - } - - @Override - public boolean isClosed() { - return socket.isClosed(); - } - - @Override - public void close() throws IOException { - // We always close the outermost streams-- in this case, 'in' and 'out' - // Closing either one of these will also close the Socket. - try { - in.close(); - } finally { - out.close(); - } - } - - @Override - public String getRemoteAddressString() { - return socket.getRemoteSocketAddress().toString(); - } - - @Override - public String getLocalAddressString() { - return socket.getLocalSocketAddress().toString(); - } - - @Override - public InputStream getInputStream() throws IOException { - return in; - } - - @Override - public OutputStream getOutputStream() throws IOException { - return out; - } - - @Override - public boolean isLocal() { - return isLocal; - } - - @Override - public String toString() { - return "NioInetPeer(" + socket.toString() + ")"; - } - - @Override - public DomainSocket getDomainSocket() { - return null; - } - - @Override - public boolean hasSecureChannel() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java index 2a547e0..e31e46a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java @@ -20,22 +20,15 @@ package org.apache.hadoop.hdfs.net; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; -import java.net.Socket; import java.net.SocketTimeoutException; import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; -import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.security.token.Token; @InterfaceAudience.Private public class TcpPeerServer implements PeerServer { @@ -43,60 +36,6 @@ public class TcpPeerServer implements PeerServer { private final ServerSocket serverSocket; - public static Peer peerFromSocket(Socket socket) - throws IOException { - Peer peer = null; - boolean success = false; - try { - // TCP_NODELAY is crucial here because of bad interactions between - // Nagle's Algorithm and Delayed ACKs. With connection keepalive - // between the client and DN, the conversation looks like: - // 1. Client -> DN: Read block X - // 2. DN -> Client: data for block X - // 3. Client -> DN: Status OK (successful read) - // 4. Client -> DN: Read block Y - // The fact that step #3 and #4 are both in the client->DN direction - // triggers Nagling. If the DN is using delayed ACKs, this results - // in a delay of 40ms or more. - // - // TCP_NODELAY disables nagling and thus avoids this performance - // disaster. - socket.setTcpNoDelay(true); - SocketChannel channel = socket.getChannel(); - if (channel == null) { - peer = new BasicInetPeer(socket); - } else { - peer = new NioInetPeer(socket); - } - success = true; - return peer; - } finally { - if (!success) { - if (peer != null) peer.close(); - socket.close(); - } - } - } - - public static Peer peerFromSocketAndKey( - SaslDataTransferClient saslClient, Socket s, - DataEncryptionKeyFactory keyFactory, - Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) - throws IOException { - Peer peer = null; - boolean success = false; - try { - peer = peerFromSocket(s); - peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId); - success = true; - return peer; - } finally { - if (!success) { - IOUtils.cleanup(null, peer); - } - } - } - /** * Create a non-secure TcpPeerServer. * @@ -136,7 +75,7 @@ public class TcpPeerServer implements PeerServer { @Override public Peer accept() throws IOException, SocketTimeoutException { - Peer peer = peerFromSocket(serverSocket.accept()); + Peer peer = DFSUtilClient.peerFromSocket(serverSocket.accept()); return peer; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java deleted file mode 100644 index 23407f8..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import java.io.InputStream; -import java.io.OutputStream; - -import org.apache.hadoop.classification.InterfaceAudience; - -/** - * A little struct class to wrap an InputStream and an OutputStream. - */ [email protected] -public class IOStreamPair { - public final InputStream in; - public final OutputStream out; - - public IOStreamPair(InputStream in, OutputStream out) { - this.in = in; - this.out = out; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java deleted file mode 100644 index 9e6a43d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer; - -import java.net.InetAddress; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.util.ReflectionUtils; - -/** - * Class used to indicate whether a channel is trusted or not. - * The default implementation is to return false indicating that - * the channel is not trusted. - * This class can be overridden to provide custom logic to determine - * whether a channel is trusted or not. - * The custom class can be specified via configuration. - * - */ -public class TrustedChannelResolver implements Configurable { - Configuration conf; - - /** - * Returns an instance of TrustedChannelResolver. - * Looks up the configuration to see if there is custom class specified. - * @param conf - * @return TrustedChannelResolver - */ - public static TrustedChannelResolver getInstance(Configuration conf) { - Class<? extends TrustedChannelResolver> clazz = - conf.getClass( - DFSConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS, - TrustedChannelResolver.class, TrustedChannelResolver.class); - return ReflectionUtils.newInstance(clazz, conf); - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - @Override - public Configuration getConf() { - return conf; - } - - /** - * Return boolean value indicating whether a channel is trusted or not - * from a client's perspective. - * @return true if the channel is trusted and false otherwise. - */ - public boolean isTrusted() { - return false; - } - - - /** - * Identify boolean value indicating whether a channel is trusted or not. - * @param peerAddress address of the peer - * @return true if the channel is trusted and false otherwise. - */ - public boolean isTrusted(InetAddress peerAddress) { - return false; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java deleted file mode 100644 index 959cba0..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; - -/** - * Creates a new {@link DataEncryptionKey} on demand. - */ [email protected] -public interface DataEncryptionKeyFactory { - - /** - * Creates a new DataEncryptionKey. - * - * @return DataEncryptionKey newly created - * @throws IOException for any error - */ - DataEncryptionKey newDataEncryptionKey() throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java deleted file mode 100644 index 852819f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java +++ /dev/null @@ -1,519 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; - -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; -import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.InetAddress; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; -import javax.security.sasl.Sasl; - -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.crypto.CipherOption; -import org.apache.hadoop.crypto.CipherSuite; -import org.apache.hadoop.crypto.CryptoCodec; -import org.apache.hadoop.crypto.CryptoInputStream; -import org.apache.hadoop.crypto.CryptoOutputStream; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; -import org.apache.hadoop.security.SaslPropertiesResolver; -import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; -import com.google.common.net.InetAddresses; -import com.google.protobuf.ByteString; - -/** - * Utility methods implementing SASL negotiation for DataTransferProtocol. - */ [email protected] -public final class DataTransferSaslUtil { - - private static final Logger LOG = LoggerFactory.getLogger( - DataTransferSaslUtil.class); - - /** - * Delimiter for the three-part SASL username string. - */ - public static final String NAME_DELIMITER = " "; - - /** - * Sent by clients and validated by servers. We use a number that's unlikely - * to ever be sent as the value of the DATA_TRANSFER_VERSION. - */ - public static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; - - /** - * Checks that SASL negotiation has completed for the given participant, and - * the negotiated quality of protection is included in the given SASL - * properties and therefore acceptable. - * - * @param sasl participant to check - * @param saslProps properties of SASL negotiation - * @throws IOException for any error - */ - public static void checkSaslComplete(SaslParticipant sasl, - Map<String, String> saslProps) throws IOException { - if (!sasl.isComplete()) { - throw new IOException("Failed to complete SASL handshake"); - } - Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList( - saslProps.get(Sasl.QOP).split(","))); - String negotiatedQop = sasl.getNegotiatedQop(); - LOG.debug("Verifying QOP, requested QOP = {}, negotiated QOP = {}", - requestedQop, negotiatedQop); - if (!requestedQop.contains(negotiatedQop)) { - throw new IOException(String.format("SASL handshake completed, but " + - "channel does not have acceptable quality of protection, " + - "requested = %s, negotiated = %s", requestedQop, negotiatedQop)); - } - } - - /** - * Check whether requested SASL Qop contains privacy. - * - * @param saslProps properties of SASL negotiation - * @return boolean true if privacy exists - */ - public static boolean requestedQopContainsPrivacy( - Map<String, String> saslProps) { - Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList( - saslProps.get(Sasl.QOP).split(","))); - return requestedQop.contains("auth-conf"); - } - - /** - * Creates SASL properties required for an encrypted SASL negotiation. - * - * @param encryptionAlgorithm to use for SASL negotation - * @return properties of encrypted SASL negotiation - */ - public static Map<String, String> createSaslPropertiesForEncryption( - String encryptionAlgorithm) { - Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3); - saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); - saslProps.put(Sasl.SERVER_AUTH, "true"); - saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); - return saslProps; - } - - /** - * For an encrypted SASL negotiation, encodes an encryption key to a SASL - * password. - * - * @param encryptionKey to encode - * @return key encoded as SASL password - */ - public static char[] encryptionKeyToPassword(byte[] encryptionKey) { - return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8) - .toCharArray(); - } - - /** - * Returns InetAddress from peer. The getRemoteAddressString has the form - * [host][/ip-address]:port. The host may be missing. The IP address (and - * preceding '/') may be missing. The port preceded by ':' is always present. - * - * @param peer - * @return InetAddress from peer - */ - public static InetAddress getPeerAddress(Peer peer) { - String remoteAddr = peer.getRemoteAddressString().split(":")[0]; - int slashIdx = remoteAddr.indexOf('/'); - return InetAddresses.forString(slashIdx != -1 ? - remoteAddr.substring(slashIdx + 1, remoteAddr.length()) : - remoteAddr); - } - - /** - * Creates a SaslPropertiesResolver from the given configuration. This method - * works by cloning the configuration, translating configuration properties - * specific to DataTransferProtocol to what SaslPropertiesResolver expects, - * and then delegating to SaslPropertiesResolver for initialization. This - * method returns null if SASL protection has not been configured for - * DataTransferProtocol. - * - * @param conf configuration to read - * @return SaslPropertiesResolver for DataTransferProtocol, or null if not - * configured - */ - public static SaslPropertiesResolver getSaslPropertiesResolver( - Configuration conf) { - String qops = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY); - if (qops == null || qops.isEmpty()) { - LOG.debug("DataTransferProtocol not using SaslPropertiesResolver, no " + - "QOP found in configuration for {}", DFS_DATA_TRANSFER_PROTECTION_KEY); - return null; - } - Configuration saslPropsResolverConf = new Configuration(conf); - saslPropsResolverConf.set(HADOOP_RPC_PROTECTION, qops); - Class<? extends SaslPropertiesResolver> resolverClass = conf.getClass( - HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS, - SaslPropertiesResolver.class, SaslPropertiesResolver.class); - resolverClass = conf.getClass(DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY, - resolverClass, SaslPropertiesResolver.class); - saslPropsResolverConf.setClass(HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS, - resolverClass, SaslPropertiesResolver.class); - SaslPropertiesResolver resolver = SaslPropertiesResolver.getInstance( - saslPropsResolverConf); - LOG.debug("DataTransferProtocol using SaslPropertiesResolver, configured " + - "QOP {} = {}, configured class {} = {}", DFS_DATA_TRANSFER_PROTECTION_KEY, qops, - DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY, resolverClass); - return resolver; - } - - /** - * Reads a SASL negotiation message. - * - * @param in stream to read - * @return bytes of SASL negotiation messsage - * @throws IOException for any error - */ - public static byte[] readSaslMessage(InputStream in) throws IOException { - DataTransferEncryptorMessageProto proto = - DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); - if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { - throw new InvalidEncryptionKeyException(proto.getMessage()); - } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { - throw new IOException(proto.getMessage()); - } else { - return proto.getPayload().toByteArray(); - } - } - - /** - * Reads a SASL negotiation message and negotiation cipher options. - * - * @param in stream to read - * @param cipherOptions list to store negotiation cipher options - * @return byte[] SASL negotiation message - * @throws IOException for any error - */ - public static byte[] readSaslMessageAndNegotiationCipherOptions( - InputStream in, List<CipherOption> cipherOptions) throws IOException { - DataTransferEncryptorMessageProto proto = - DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); - if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { - throw new InvalidEncryptionKeyException(proto.getMessage()); - } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { - throw new IOException(proto.getMessage()); - } else { - List<CipherOptionProto> optionProtos = proto.getCipherOptionList(); - if (optionProtos != null) { - for (CipherOptionProto optionProto : optionProtos) { - cipherOptions.add(PBHelper.convert(optionProto)); - } - } - return proto.getPayload().toByteArray(); - } - } - - /** - * Negotiate a cipher option which server supports. - * - * @param conf the configuration - * @param options the cipher options which client supports - * @return CipherOption negotiated cipher option - */ - public static CipherOption negotiateCipherOption(Configuration conf, - List<CipherOption> options) throws IOException { - // Negotiate cipher suites if configured. Currently, the only supported - // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple - // values for future expansion. - String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); - if (cipherSuites == null || cipherSuites.isEmpty()) { - return null; - } - if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) { - throw new IOException(String.format("Invalid cipher suite, %s=%s", - DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); - } - if (options != null) { - for (CipherOption option : options) { - CipherSuite suite = option.getCipherSuite(); - if (suite == CipherSuite.AES_CTR_NOPADDING) { - int keyLen = conf.getInt( - DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_KEY, - DFS_ENCRYPT_DATA_TRANSFER_CIPHER_KEY_BITLENGTH_DEFAULT) / 8; - CryptoCodec codec = CryptoCodec.getInstance(conf, suite); - byte[] inKey = new byte[keyLen]; - byte[] inIv = new byte[suite.getAlgorithmBlockSize()]; - byte[] outKey = new byte[keyLen]; - byte[] outIv = new byte[suite.getAlgorithmBlockSize()]; - codec.generateSecureRandom(inKey); - codec.generateSecureRandom(inIv); - codec.generateSecureRandom(outKey); - codec.generateSecureRandom(outIv); - return new CipherOption(suite, inKey, inIv, outKey, outIv); - } - } - } - return null; - } - - /** - * Send SASL message and negotiated cipher option to client. - * - * @param out stream to receive message - * @param payload to send - * @param option negotiated cipher option - * @throws IOException for any error - */ - public static void sendSaslMessageAndNegotiatedCipherOption( - OutputStream out, byte[] payload, CipherOption option) - throws IOException { - DataTransferEncryptorMessageProto.Builder builder = - DataTransferEncryptorMessageProto.newBuilder(); - - builder.setStatus(DataTransferEncryptorStatus.SUCCESS); - if (payload != null) { - builder.setPayload(ByteString.copyFrom(payload)); - } - if (option != null) { - builder.addCipherOption(PBHelper.convert(option)); - } - - DataTransferEncryptorMessageProto proto = builder.build(); - proto.writeDelimitedTo(out); - out.flush(); - } - - /** - * Create IOStreamPair of {@link org.apache.hadoop.crypto.CryptoInputStream} - * and {@link org.apache.hadoop.crypto.CryptoOutputStream} - * - * @param conf the configuration - * @param cipherOption negotiated cipher option - * @param out underlying output stream - * @param in underlying input stream - * @param isServer is server side - * @return IOStreamPair the stream pair - * @throws IOException for any error - */ - public static IOStreamPair createStreamPair(Configuration conf, - CipherOption cipherOption, OutputStream out, InputStream in, - boolean isServer) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Creating IOStreamPair of CryptoInputStream and " + - "CryptoOutputStream."); - } - CryptoCodec codec = CryptoCodec.getInstance(conf, - cipherOption.getCipherSuite()); - byte[] inKey = cipherOption.getInKey(); - byte[] inIv = cipherOption.getInIv(); - byte[] outKey = cipherOption.getOutKey(); - byte[] outIv = cipherOption.getOutIv(); - InputStream cIn = new CryptoInputStream(in, codec, - isServer ? inKey : outKey, isServer ? inIv : outIv); - OutputStream cOut = new CryptoOutputStream(out, codec, - isServer ? outKey : inKey, isServer ? outIv : inIv); - return new IOStreamPair(cIn, cOut); - } - - /** - * Sends a SASL negotiation message indicating an error. - * - * @param out stream to receive message - * @param message to send - * @throws IOException for any error - */ - public static void sendGenericSaslErrorMessage(OutputStream out, - String message) throws IOException { - sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message); - } - - /** - * Sends a SASL negotiation message. - * - * @param out stream to receive message - * @param payload to send - * @throws IOException for any error - */ - public static void sendSaslMessage(OutputStream out, byte[] payload) - throws IOException { - sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null); - } - - /** - * Send a SASL negotiation message and negotiation cipher options to server. - * - * @param out stream to receive message - * @param payload to send - * @param options cipher options to negotiate - * @throws IOException for any error - */ - public static void sendSaslMessageAndNegotiationCipherOptions( - OutputStream out, byte[] payload, List<CipherOption> options) - throws IOException { - DataTransferEncryptorMessageProto.Builder builder = - DataTransferEncryptorMessageProto.newBuilder(); - - builder.setStatus(DataTransferEncryptorStatus.SUCCESS); - if (payload != null) { - builder.setPayload(ByteString.copyFrom(payload)); - } - if (options != null) { - builder.addAllCipherOption(PBHelper.convertCipherOptions(options)); - } - - DataTransferEncryptorMessageProto proto = builder.build(); - proto.writeDelimitedTo(out); - out.flush(); - } - - /** - * Read SASL message and negotiated cipher option from server. - * - * @param in stream to read - * @return SaslResponseWithNegotiatedCipherOption SASL message and - * negotiated cipher option - * @throws IOException for any error - */ - public static SaslResponseWithNegotiatedCipherOption - readSaslMessageAndNegotiatedCipherOption(InputStream in) - throws IOException { - DataTransferEncryptorMessageProto proto = - DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); - if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { - throw new InvalidEncryptionKeyException(proto.getMessage()); - } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { - throw new IOException(proto.getMessage()); - } else { - byte[] response = proto.getPayload().toByteArray(); - List<CipherOption> options = PBHelper.convertCipherOptionProtos( - proto.getCipherOptionList()); - CipherOption option = null; - if (options != null && !options.isEmpty()) { - option = options.get(0); - } - return new SaslResponseWithNegotiatedCipherOption(response, option); - } - } - - /** - * Encrypt the key and iv of the negotiated cipher option. - * - * @param option negotiated cipher option - * @param sasl SASL participant representing server - * @return CipherOption negotiated cipher option which contains the - * encrypted key and iv - * @throws IOException for any error - */ - public static CipherOption wrap(CipherOption option, SaslParticipant sasl) - throws IOException { - if (option != null) { - byte[] inKey = option.getInKey(); - if (inKey != null) { - inKey = sasl.wrap(inKey, 0, inKey.length); - } - byte[] outKey = option.getOutKey(); - if (outKey != null) { - outKey = sasl.wrap(outKey, 0, outKey.length); - } - return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), - outKey, option.getOutIv()); - } - - return null; - } - - /** - * Decrypt the key and iv of the negotiated cipher option. - * - * @param option negotiated cipher option - * @param sasl SASL participant representing client - * @return CipherOption negotiated cipher option which contains the - * decrypted key and iv - * @throws IOException for any error - */ - public static CipherOption unwrap(CipherOption option, SaslParticipant sasl) - throws IOException { - if (option != null) { - byte[] inKey = option.getInKey(); - if (inKey != null) { - inKey = sasl.unwrap(inKey, 0, inKey.length); - } - byte[] outKey = option.getOutKey(); - if (outKey != null) { - outKey = sasl.unwrap(outKey, 0, outKey.length); - } - return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), - outKey, option.getOutIv()); - } - - return null; - } - - /** - * Sends a SASL negotiation message. - * - * @param out stream to receive message - * @param status negotiation status - * @param payload to send - * @param message to send - * @throws IOException for any error - */ - public static void sendSaslMessage(OutputStream out, - DataTransferEncryptorStatus status, byte[] payload, String message) - throws IOException { - DataTransferEncryptorMessageProto.Builder builder = - DataTransferEncryptorMessageProto.newBuilder(); - - builder.setStatus(status); - if (payload != null) { - builder.setPayload(ByteString.copyFrom(payload)); - } - if (message != null) { - builder.setMessage(message); - } - - DataTransferEncryptorMessageProto proto = builder.build(); - proto.writeDelimitedTo(out); - out.flush(); - } - - /** - * There is no reason to instantiate this class. - */ - private DataTransferSaslUtil() { - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java deleted file mode 100644 index 00b131f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java +++ /dev/null @@ -1,498 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; -import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.InetAddress; -import java.net.Socket; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.sasl.RealmCallback; -import javax.security.sasl.RealmChoiceCallback; - -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.crypto.CipherOption; -import org.apache.hadoop.crypto.CipherSuite; -import org.apache.hadoop.hdfs.net.EncryptedPeer; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; -import org.apache.hadoop.security.SaslPropertiesResolver; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Charsets; -import com.google.common.collect.Lists; - -/** - * Negotiates SASL for DataTransferProtocol on behalf of a client. There are - * two possible supported variants of SASL negotiation: either a general-purpose - * negotiation supporting any quality of protection, or a specialized - * negotiation that enforces privacy as the quality of protection using a - * cryptographically strong encryption key. - * - * This class is used in both the HDFS client and the DataNode. The DataNode - * needs it, because it acts as a client to other DataNodes during write - * pipelines and block transfers. - */ [email protected] -public class SaslDataTransferClient { - - private static final Logger LOG = LoggerFactory.getLogger( - SaslDataTransferClient.class); - - private final Configuration conf; - private final AtomicBoolean fallbackToSimpleAuth; - private final SaslPropertiesResolver saslPropsResolver; - private final TrustedChannelResolver trustedChannelResolver; - - /** - * Creates a new SaslDataTransferClient. This constructor is used in cases - * where it is not relevant to track if a secure client did a fallback to - * simple auth. For intra-cluster connections between data nodes in the same - * cluster, we can assume that all run under the same security configuration. - * - * @param conf the configuration - * @param saslPropsResolver for determining properties of SASL negotiation - * @param trustedChannelResolver for identifying trusted connections that do - * not require SASL negotiation - */ - public SaslDataTransferClient(Configuration conf, - SaslPropertiesResolver saslPropsResolver, - TrustedChannelResolver trustedChannelResolver) { - this(conf, saslPropsResolver, trustedChannelResolver, null); - } - - /** - * Creates a new SaslDataTransferClient. - * - * @param conf the configuration - * @param saslPropsResolver for determining properties of SASL negotiation - * @param trustedChannelResolver for identifying trusted connections that do - * not require SASL negotiation - * @param fallbackToSimpleAuth checked on each attempt at general SASL - * handshake, if true forces use of simple auth - */ - public SaslDataTransferClient(Configuration conf, - SaslPropertiesResolver saslPropsResolver, - TrustedChannelResolver trustedChannelResolver, - AtomicBoolean fallbackToSimpleAuth) { - this.conf = conf; - this.fallbackToSimpleAuth = fallbackToSimpleAuth; - this.saslPropsResolver = saslPropsResolver; - this.trustedChannelResolver = trustedChannelResolver; - } - - /** - * Sends client SASL negotiation for a newly allocated socket if required. - * - * @param socket connection socket - * @param underlyingOut connection output stream - * @param underlyingIn connection input stream - * @param encryptionKeyFactory for creation of an encryption key - * @param accessToken connection block access token - * @param datanodeId ID of destination DataNode - * @return new pair of streams, wrapped after SASL negotiation - * @throws IOException for any error - */ - public IOStreamPair newSocketSend(Socket socket, OutputStream underlyingOut, - InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory, - Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId) - throws IOException { - // The encryption key factory only returns a key if encryption is enabled. - DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ? - encryptionKeyFactory.newDataEncryptionKey() : null; - IOStreamPair ios = send(socket.getInetAddress(), underlyingOut, - underlyingIn, encryptionKey, accessToken, datanodeId); - return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut); - } - - /** - * Sends client SASL negotiation for a peer if required. - * - * @param peer connection peer - * @param encryptionKeyFactory for creation of an encryption key - * @param accessToken connection block access token - * @param datanodeId ID of destination DataNode - * @return new pair of streams, wrapped after SASL negotiation - * @throws IOException for any error - */ - public Peer peerSend(Peer peer, DataEncryptionKeyFactory encryptionKeyFactory, - Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId) - throws IOException { - IOStreamPair ios = checkTrustAndSend(getPeerAddress(peer), - peer.getOutputStream(), peer.getInputStream(), encryptionKeyFactory, - accessToken, datanodeId); - // TODO: Consider renaming EncryptedPeer to SaslPeer. - return ios != null ? new EncryptedPeer(peer, ios) : peer; - } - - /** - * Sends client SASL negotiation for a socket if required. - * - * @param socket connection socket - * @param underlyingOut connection output stream - * @param underlyingIn connection input stream - * @param encryptionKeyFactory for creation of an encryption key - * @param accessToken connection block access token - * @param datanodeId ID of destination DataNode - * @return new pair of streams, wrapped after SASL negotiation - * @throws IOException for any error - */ - public IOStreamPair socketSend(Socket socket, OutputStream underlyingOut, - InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory, - Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId) - throws IOException { - IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut, - underlyingIn, encryptionKeyFactory, accessToken, datanodeId); - return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut); - } - - /** - * Checks if an address is already trusted and then sends client SASL - * negotiation if required. - * - * @param addr connection address - * @param underlyingOut connection output stream - * @param underlyingIn connection input stream - * @param encryptionKeyFactory for creation of an encryption key - * @param accessToken connection block access token - * @param datanodeId ID of destination DataNode - * @return new pair of streams, wrapped after SASL negotiation - * @throws IOException for any error - */ - private IOStreamPair checkTrustAndSend(InetAddress addr, - OutputStream underlyingOut, InputStream underlyingIn, - DataEncryptionKeyFactory encryptionKeyFactory, - Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId) - throws IOException { - if (!trustedChannelResolver.isTrusted() && - !trustedChannelResolver.isTrusted(addr)) { - // The encryption key factory only returns a key if encryption is enabled. - DataEncryptionKey encryptionKey = - encryptionKeyFactory.newDataEncryptionKey(); - return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken, - datanodeId); - } else { - LOG.debug( - "SASL client skipping handshake on trusted connection for addr = {}, " - + "datanodeId = {}", addr, datanodeId); - return null; - } - } - - /** - * Sends client SASL negotiation if required. Determines the correct type of - * SASL handshake based on configuration. - * - * @param addr connection address - * @param underlyingOut connection output stream - * @param underlyingIn connection input stream - * @param encryptionKey for an encrypted SASL handshake - * @param accessToken connection block access token - * @param datanodeId ID of destination DataNode - * @return new pair of streams, wrapped after SASL negotiation - * @throws IOException for any error - */ - private IOStreamPair send(InetAddress addr, OutputStream underlyingOut, - InputStream underlyingIn, DataEncryptionKey encryptionKey, - Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId) - throws IOException { - if (encryptionKey != null) { - LOG.debug( - "SASL client doing encrypted handshake for addr = {}, datanodeId = {}", - addr, datanodeId); - return getEncryptedStreams(underlyingOut, underlyingIn, - encryptionKey); - } else if (!UserGroupInformation.isSecurityEnabled()) { - LOG.debug( - "SASL client skipping handshake in unsecured configuration for " - + "addr = {}, datanodeId = {}", addr, datanodeId); - return null; - } else if (SecurityUtil.isPrivilegedPort(datanodeId.getXferPort())) { - LOG.debug( - "SASL client skipping handshake in secured configuration with " - + "privileged port for addr = {}, datanodeId = {}", addr, datanodeId); - return null; - } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { - LOG.debug( - "SASL client skipping handshake in secured configuration with " - + "unsecured cluster for addr = {}, datanodeId = {}", addr, datanodeId); - return null; - } else if (saslPropsResolver != null) { - LOG.debug( - "SASL client doing general handshake for addr = {}, datanodeId = {}", - addr, datanodeId); - return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken, - datanodeId); - } else { - // It's a secured cluster using non-privileged ports, but no SASL. The - // only way this can happen is if the DataNode has - // ignore.secure.ports.for.testing configured, so this is a rare edge case. - LOG.debug( - "SASL client skipping handshake in secured configuration with no SASL " - + "protection configured for addr = {}, datanodeId = {}", - addr, datanodeId); - return null; - } - } - - /** - * Sends client SASL negotiation for specialized encrypted handshake. - * - * @param underlyingOut connection output stream - * @param underlyingIn connection input stream - * @param encryptionKey for an encrypted SASL handshake - * @return new pair of streams, wrapped after SASL negotiation - * @throws IOException for any error - */ - private IOStreamPair getEncryptedStreams(OutputStream underlyingOut, - InputStream underlyingIn, DataEncryptionKey encryptionKey) - throws IOException { - Map<String, String> saslProps = createSaslPropertiesForEncryption( - encryptionKey.encryptionAlgorithm); - - LOG.debug("Client using encryption algorithm {}", - encryptionKey.encryptionAlgorithm); - - String userName = getUserNameFromEncryptionKey(encryptionKey); - char[] password = encryptionKeyToPassword(encryptionKey.encryptionKey); - CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName, - password); - return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps, - callbackHandler); - } - - /** - * The SASL username for an encrypted handshake consists of the keyId, - * blockPoolId, and nonce with the first two encoded as Strings, and the third - * encoded using Base64. The fields are each separated by a single space. - * - * @param encryptionKey the encryption key to encode as a SASL username. - * @return encoded username containing keyId, blockPoolId, and nonce - */ - private static String getUserNameFromEncryptionKey( - DataEncryptionKey encryptionKey) { - return encryptionKey.keyId + NAME_DELIMITER + - encryptionKey.blockPoolId + NAME_DELIMITER + - new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8); - } - - /** - * Sets user name and password when asked by the client-side SASL object. - */ - private static final class SaslClientCallbackHandler - implements CallbackHandler { - - private final char[] password; - private final String userName; - - /** - * Creates a new SaslClientCallbackHandler. - * - * @param userName SASL user name - * @Param password SASL password - */ - public SaslClientCallbackHandler(String userName, char[] password) { - this.password = password; - this.userName = userName; - } - - @Override - public void handle(Callback[] callbacks) throws IOException, - UnsupportedCallbackException { - NameCallback nc = null; - PasswordCallback pc = null; - RealmCallback rc = null; - for (Callback callback : callbacks) { - if (callback instanceof RealmChoiceCallback) { - continue; - } else if (callback instanceof NameCallback) { - nc = (NameCallback) callback; - } else if (callback instanceof PasswordCallback) { - pc = (PasswordCallback) callback; - } else if (callback instanceof RealmCallback) { - rc = (RealmCallback) callback; - } else { - throw new UnsupportedCallbackException(callback, - "Unrecognized SASL client callback"); - } - } - if (nc != null) { - nc.setName(userName); - } - if (pc != null) { - pc.setPassword(password); - } - if (rc != null) { - rc.setText(rc.getDefaultText()); - } - } - } - - /** - * Sends client SASL negotiation for general-purpose handshake. - * - * @param addr connection address - * @param underlyingOut connection output stream - * @param underlyingIn connection input stream - * @param accessToken connection block access token - * @param datanodeId ID of destination DataNode - * @return new pair of streams, wrapped after SASL negotiation - * @throws IOException for any error - */ - private IOStreamPair getSaslStreams(InetAddress addr, - OutputStream underlyingOut, InputStream underlyingIn, - Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId) - throws IOException { - Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr); - - String userName = buildUserName(accessToken); - char[] password = buildClientPassword(accessToken); - CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName, - password); - return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps, - callbackHandler); - } - - /** - * Builds the client's user name for the general-purpose handshake, consisting - * of the base64-encoded serialized block access token identifier. Note that - * this includes only the token identifier, not the token itself, which would - * include the password. The password is a shared secret, and we must not - * write it on the network during the SASL authentication exchange. - * - * @param blockToken for block access - * @return SASL user name - */ - private static String buildUserName(Token<BlockTokenIdentifier> blockToken) { - return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), - Charsets.UTF_8); - } - - /** - * Calculates the password on the client side for the general-purpose - * handshake. The password consists of the block access token's password. - * - * @param blockToken for block access - * @return SASL password - */ - private char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) { - return new String(Base64.encodeBase64(blockToken.getPassword(), false), - Charsets.UTF_8).toCharArray(); - } - - /** - * This method actually executes the client-side SASL handshake. - * - * @param underlyingOut connection output stream - * @param underlyingIn connection input stream - * @param userName SASL user name - * @param saslProps properties of SASL negotiation - * @param callbackHandler for responding to SASL callbacks - * @return new pair of streams, wrapped after SASL negotiation - * @throws IOException for any error - */ - private IOStreamPair doSaslHandshake(OutputStream underlyingOut, - InputStream underlyingIn, String userName, Map<String, String> saslProps, - CallbackHandler callbackHandler) throws IOException { - - DataOutputStream out = new DataOutputStream(underlyingOut); - DataInputStream in = new DataInputStream(underlyingIn); - - SaslParticipant sasl= SaslParticipant.createClientSaslParticipant(userName, - saslProps, callbackHandler); - - out.writeInt(SASL_TRANSFER_MAGIC_NUMBER); - out.flush(); - - try { - // Start of handshake - "initial response" in SASL terminology. - sendSaslMessage(out, new byte[0]); - - // step 1 - byte[] remoteResponse = readSaslMessage(in); - byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse); - List<CipherOption> cipherOptions = null; - if (requestedQopContainsPrivacy(saslProps)) { - // Negotiate cipher suites if configured. Currently, the only supported - // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple - // values for future expansion. - String cipherSuites = conf.get( - DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); - if (cipherSuites != null && !cipherSuites.isEmpty()) { - if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) { - throw new IOException(String.format("Invalid cipher suite, %s=%s", - DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); - } - CipherOption option = new CipherOption(CipherSuite.AES_CTR_NOPADDING); - cipherOptions = Lists.newArrayListWithCapacity(1); - cipherOptions.add(option); - } - } - sendSaslMessageAndNegotiationCipherOptions(out, localResponse, - cipherOptions); - - // step 2 (client-side only) - SaslResponseWithNegotiatedCipherOption response = - readSaslMessageAndNegotiatedCipherOption(in); - localResponse = sasl.evaluateChallengeOrResponse(response.payload); - assert localResponse == null; - - // SASL handshake is complete - checkSaslComplete(sasl, saslProps); - - CipherOption cipherOption = null; - if (sasl.isNegotiatedQopPrivacy()) { - // Unwrap the negotiated cipher option - cipherOption = unwrap(response.cipherOption, sasl); - } - - // If negotiated cipher option is not null, we will use it to create - // stream pair. - return cipherOption != null ? createStreamPair( - conf, cipherOption, underlyingOut, underlyingIn, false) : - sasl.createStreamPair(out, in); - } catch (IOException ioe) { - sendGenericSaslErrorMessage(out, ioe.getMessage()); - throw ioe; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java index f060beb..95965b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*; import java.io.ByteArrayInputStream; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java deleted file mode 100644 index f14a075..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java +++ /dev/null @@ -1,210 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.util.Map; -import javax.security.auth.callback.CallbackHandler; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.security.SaslInputStream; -import org.apache.hadoop.security.SaslOutputStream; - -/** - * Strongly inspired by Thrift's TSaslTransport class. - * - * Used to abstract over the <code>SaslServer</code> and - * <code>SaslClient</code> classes, which share a lot of their interface, but - * unfortunately don't share a common superclass. - */ [email protected] -class SaslParticipant { - - // This has to be set as part of the SASL spec, but it don't matter for - // our purposes, but may not be empty. It's sent over the wire, so use - // a short string. - private static final String SERVER_NAME = "0"; - private static final String PROTOCOL = "hdfs"; - private static final String MECHANISM = "DIGEST-MD5"; - - // One of these will always be null. - private final SaslServer saslServer; - private final SaslClient saslClient; - - /** - * Creates a SaslParticipant wrapping a SaslServer. - * - * @param saslProps properties of SASL negotiation - * @param callbackHandler for handling all SASL callbacks - * @return SaslParticipant wrapping SaslServer - * @throws SaslException for any error - */ - public static SaslParticipant createServerSaslParticipant( - Map<String, String> saslProps, CallbackHandler callbackHandler) - throws SaslException { - return new SaslParticipant(Sasl.createSaslServer(MECHANISM, - PROTOCOL, SERVER_NAME, saslProps, callbackHandler)); - } - - /** - * Creates a SaslParticipant wrapping a SaslClient. - * - * @param userName SASL user name - * @param saslProps properties of SASL negotiation - * @param callbackHandler for handling all SASL callbacks - * @return SaslParticipant wrapping SaslClient - * @throws SaslException for any error - */ - public static SaslParticipant createClientSaslParticipant(String userName, - Map<String, String> saslProps, CallbackHandler callbackHandler) - throws SaslException { - return new SaslParticipant(Sasl.createSaslClient(new String[] { MECHANISM }, - userName, PROTOCOL, SERVER_NAME, saslProps, callbackHandler)); - } - - /** - * Private constructor wrapping a SaslServer. - * - * @param saslServer to wrap - */ - private SaslParticipant(SaslServer saslServer) { - this.saslServer = saslServer; - this.saslClient = null; - } - - /** - * Private constructor wrapping a SaslClient. - * - * @param saslClient to wrap - */ - private SaslParticipant(SaslClient saslClient) { - this.saslServer = null; - this.saslClient = saslClient; - } - - /** - * @see {@link SaslServer#evaluateResponse} - * @see {@link SaslClient#evaluateChallenge} - */ - public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse) - throws SaslException { - if (saslClient != null) { - return saslClient.evaluateChallenge(challengeOrResponse); - } else { - return saslServer.evaluateResponse(challengeOrResponse); - } - } - - /** - * After successful SASL negotation, returns the negotiated quality of - * protection. - * - * @return negotiated quality of protection - */ - public String getNegotiatedQop() { - if (saslClient != null) { - return (String) saslClient.getNegotiatedProperty(Sasl.QOP); - } else { - return (String) saslServer.getNegotiatedProperty(Sasl.QOP); - } - } - - /** - * After successful SASL negotiation, returns whether it's QOP privacy - * - * @return boolean whether it's QOP privacy - */ - public boolean isNegotiatedQopPrivacy() { - String qop = getNegotiatedQop(); - return qop != null && "auth-conf".equalsIgnoreCase(qop); - } - - /** - * Wraps a byte array. - * - * @param bytes The array containing the bytes to wrap. - * @param off The starting position at the array - * @param len The number of bytes to wrap - * @return byte[] wrapped bytes - * @throws SaslException if the bytes cannot be successfully wrapped - */ - public byte[] wrap(byte[] bytes, int off, int len) throws SaslException { - if (saslClient != null) { - return saslClient.wrap(bytes, off, len); - } else { - return saslServer.wrap(bytes, off, len); - } - } - - /** - * Unwraps a byte array. - * - * @param bytes The array containing the bytes to unwrap. - * @param off The starting position at the array - * @param len The number of bytes to unwrap - * @return byte[] unwrapped bytes - * @throws SaslException if the bytes cannot be successfully unwrapped - */ - public byte[] unwrap(byte[] bytes, int off, int len) throws SaslException { - if (saslClient != null) { - return saslClient.unwrap(bytes, off, len); - } else { - return saslServer.unwrap(bytes, off, len); - } - } - - /** - * Returns true if SASL negotiation is complete. - * - * @return true if SASL negotiation is complete - */ - public boolean isComplete() { - if (saslClient != null) { - return saslClient.isComplete(); - } else { - return saslServer.isComplete(); - } - } - - /** - * Return some input/output streams that may henceforth have their - * communication encrypted, depending on the negotiated quality of protection. - * - * @param out output stream to wrap - * @param in input stream to wrap - * @return IOStreamPair wrapping the streams - */ - public IOStreamPair createStreamPair(DataOutputStream out, - DataInputStream in) { - if (saslClient != null) { - return new IOStreamPair( - new SaslInputStream(in, saslClient), - new SaslOutputStream(out, saslClient)); - } else { - return new IOStreamPair( - new SaslInputStream(in, saslServer), - new SaslOutputStream(out, saslServer)); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed78b14e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java deleted file mode 100644 index f69441b..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.protocol.datatransfer.sasl; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.crypto.CipherOption; - [email protected] -public class SaslResponseWithNegotiatedCipherOption { - final byte[] payload; - final CipherOption cipherOption; - - public SaslResponseWithNegotiatedCipherOption(byte[] payload, - CipherOption cipherOption) { - this.payload = payload; - this.cipherOption = cipherOption; - } -} \ No newline at end of file
