Repository: hadoop Updated Branches: refs/heads/branch-2.7 599146d10 -> 039c3a735
HDFS-10609. Uncaught InvalidEncryptionKeyException during pipeline recovery may abort downstream applications. Contributed by Wei-Chiu Chuang. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/039c3a73 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/039c3a73 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/039c3a73 Branch: refs/heads/branch-2.7 Commit: 039c3a735192ac05209af89c0cc74a27c118a21f Parents: 599146d Author: Wei-Chiu Chuang <weic...@apache.org> Authored: Wed Oct 5 13:29:20 2016 -0700 Committer: Wei-Chiu Chuang <weic...@apache.org> Committed: Wed Oct 5 13:29:20 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hdfs/DFSClient.java | 5 + .../org/apache/hadoop/hdfs/DFSOutputStream.java | 147 +++- .../block/BlockPoolTokenSecretManager.java | 3 +- .../token/block/BlockTokenSecretManager.java | 6 + .../hadoop/hdfs/server/datanode/DataNode.java | 5 + .../hadoop/hdfs/TestEncryptedTransfer.java | 719 ++++++++----------- 6 files changed, 421 insertions(+), 464 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 58d93cd..1a6a96b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -2188,6 +2188,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } } + @VisibleForTesting + public DataEncryptionKey getEncryptionKey() { + return encryptionKey; + } + /** * Get the checksum of the whole file of a range of the file. Note that the * range always starts from the beginning of the file. http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index f8c8592..ef8aa5a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -225,6 +225,89 @@ public class DFSOutputStream extends FSOutputSummer // if them are received, the DataStreamer closes the current block. // class DataStreamer extends Daemon { + private class RefetchEncryptionKeyPolicy { + private int fetchEncryptionKeyTimes = 0; + private InvalidEncryptionKeyException lastException; + private final DatanodeInfo src; + + RefetchEncryptionKeyPolicy(DatanodeInfo src) { + this.src = src; + } + boolean continueRetryingOrThrow() throws InvalidEncryptionKeyException { + if (fetchEncryptionKeyTimes >= 2) { + // hit the same exception twice connecting to the node, so + // throw the exception and exclude the node. + throw lastException; + } + // Don't exclude this node just yet. + // Try again with a new encryption key. + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + + "encryption key was invalid when connecting to " + + this.src + ": ", lastException); + // The encryption key used is invalid. + dfsClient.clearDataEncryptionKey(); + return true; + } + + /** + * Record a connection exception. + * @param e + * @throws InvalidEncryptionKeyException + */ + void recordFailure(final InvalidEncryptionKeyException e) + throws InvalidEncryptionKeyException { + fetchEncryptionKeyTimes++; + lastException = e; + } + } + + private class StreamerStreams implements java.io.Closeable { + private Socket sock = null; + private DataOutputStream out = null; + private DataInputStream in = null; + + StreamerStreams(final DatanodeInfo src, + final long writeTimeout, final long readTimeout, + final Token<BlockTokenIdentifier> blockToken) + throws IOException { + sock = createSocketForPipeline(src, 2, dfsClient); + + OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); + InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout); + IOStreamPair saslStreams = dfsClient.saslClient + .socketSend(sock, unbufOut, unbufIn, dfsClient, blockToken, src); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; + out = new DataOutputStream(new BufferedOutputStream(unbufOut, + HdfsConstants.SMALL_BUFFER_SIZE)); + in = new DataInputStream(unbufIn); + } + + void sendTransferBlock(final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, + final Token<BlockTokenIdentifier> blockToken) throws IOException { + //send the TRANSFER_BLOCK request + new Sender(out) + .transferBlock(block, blockToken, dfsClient.clientName, targets, + targetStorageTypes); + out.flush(); + //ack + BlockOpResponseProto transferResponse = BlockOpResponseProto + .parseFrom(PBHelper.vintPrefixed(in)); + if (SUCCESS != transferResponse.getStatus()) { + throw new IOException("Failed to add a datanode. Response status: " + + transferResponse.getStatus()); + } + } + + @Override + public void close() throws IOException { + IOUtils.closeStream(in); + IOUtils.closeStream(out); + IOUtils.closeSocket(sock); + } + } + private volatile boolean streamerClosed = false; private volatile ExtendedBlock block; // its length is number of bytes acked private Token<BlockTokenIdentifier> accessToken; @@ -1010,48 +1093,38 @@ public class DFSOutputStream extends FSOutputSummer new IOException("Failed to add a node"); } + private long computeTransferWriteTimeout() { + return dfsClient.getDatanodeWriteTimeout(2); + } + private long computeTransferReadTimeout() { + // transfer timeout multiplier based on the transfer size + // One per 200 packets = 12.8MB. Minimum is 2. + int multi = 2 + + (int) (bytesSent / dfsClient.getConf().writePacketSize) / 200; + return dfsClient.getDatanodeReadTimeout(multi); + } + private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final Token<BlockTokenIdentifier> blockToken) throws IOException { //transfer replica to the new datanode - Socket sock = null; - DataOutputStream out = null; - DataInputStream in = null; - try { - sock = createSocketForPipeline(src, 2, dfsClient); - final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); - - // transfer timeout multiplier based on the transfer size - // One per 200 packets = 12.8MB. Minimum is 2. - int multi = 2 + (int)(bytesSent/dfsClient.getConf().writePacketSize)/200; - final long readTimeout = dfsClient.getDatanodeReadTimeout(multi); - - OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); - InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout); - IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock, - unbufOut, unbufIn, dfsClient, blockToken, src); - unbufOut = saslStreams.out; - unbufIn = saslStreams.in; - out = new DataOutputStream(new BufferedOutputStream(unbufOut, - HdfsConstants.SMALL_BUFFER_SIZE)); - in = new DataInputStream(unbufIn); - - //send the TRANSFER_BLOCK request - new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, - targets, targetStorageTypes); - out.flush(); - - //ack - BlockOpResponseProto response = - BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); - if (SUCCESS != response.getStatus()) { - throw new IOException("Failed to add a datanode"); + RefetchEncryptionKeyPolicy policy = new RefetchEncryptionKeyPolicy(src); + do { + StreamerStreams streams = null; + try { + final long writeTimeout = computeTransferWriteTimeout(); + final long readTimeout = computeTransferReadTimeout(); + + streams = new StreamerStreams(src, writeTimeout, readTimeout, + blockToken); + streams.sendTransferBlock(targets, targetStorageTypes, blockToken); + return; + } catch (InvalidEncryptionKeyException e) { + policy.recordFailure(e); + } finally { + IOUtils.closeStream(streams); } - } finally { - IOUtils.closeStream(in); - IOUtils.closeStream(out); - IOUtils.closeSocket(sock); - } + } while (policy.continueRetryingOrThrow()); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java index 0df7067..7e3c877 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java @@ -49,7 +49,8 @@ public class BlockPoolTokenSecretManager extends map.put(bpid, secretMgr); } - synchronized BlockTokenSecretManager get(String bpid) { + @VisibleForTesting + public synchronized BlockTokenSecretManager get(String bpid) { BlockTokenSecretManager secretMgr = map.get(bpid); if (secretMgr == null) { throw new IllegalArgumentException("Block pool " + bpid http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java index a3685ca..4d4c4bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java @@ -439,6 +439,12 @@ public class BlockTokenSecretManager extends } @VisibleForTesting + public synchronized boolean hasKey(int keyId) { + BlockKey key = allKeys.get(keyId); + return key != null; + } + + @VisibleForTesting public synchronized int getSerialNoForTesting() { return serialNo; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 3b27752..9ef23d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2487,6 +2487,11 @@ public class DataNode extends ReconfigurableBase } + @VisibleForTesting + public BlockPoolTokenSecretManager getBlockPoolTokenSecretManager() { + return blockPoolTokenSecretManager; + } + public static void secureMain(String args[], SecureResources resources) { int errorCode = 0; try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c3a73/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java index 30484d1..0ffa933 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptedTransfer.java @@ -21,32 +21,41 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.times; import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeoutException; +import com.google.common.base.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.apache.log4j.Level; import org.apache.log4j.LogManager; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -72,8 +81,12 @@ public class TestEncryptedTransfer { private static final String PLAIN_TEXT = "this is very secret plain text"; private static final Path TEST_PATH = new Path("/non-encrypted-file"); - - private void setEncryptionConfigKeys(Configuration conf) { + + private MiniDFSCluster cluster = null; + private Configuration conf = null; + private FileSystem fs = null; + + private void setEncryptionConfigKeys() { conf.setBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); if (resolverClazz != null){ @@ -96,389 +109,271 @@ public class TestEncryptedTransfer { this.resolverClazz = resolverClazz; } - @Test - public void testEncryptedRead() throws IOException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).build(); - - FileSystem fs = getFileSystem(conf); - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - FileChecksum checksum = fs.getFileChecksum(TEST_PATH); + @Before + public void setup() throws IOException { + conf = new Configuration(); + } + + @After + public void teardown() throws IOException { + if (fs != null) { fs.close(); + } + if (cluster != null) { cluster.shutdown(); - - setEncryptionConfigKeys(conf); - - cluster = new MiniDFSCluster.Builder(conf) - .manageDataDfsDirs(false) - .manageNameDfsDirs(false) - .format(false) - .startupOption(StartupOption.REGULAR) - .build(); - - fs = getFileSystem(conf); - LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(SaslDataTransferServer.class)); - LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(DataTransferSaslUtil.class)); - try { - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - } finally { - logs.stopCapturing(); - logs1.stopCapturing(); - } - - fs.close(); - - if (resolverClazz == null) { - // Test client and server negotiate cipher option - GenericTestUtils.assertDoesNotMatch(logs.getOutput(), - "Server using cipher suite"); - // Check the IOStreamPair - GenericTestUtils.assertDoesNotMatch(logs1.getOutput(), - "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); - } - } finally { - if (cluster != null) { - cluster.shutdown(); - } } } - - @Test - public void testEncryptedReadWithRC4() throws IOException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).build(); - - FileSystem fs = getFileSystem(conf); - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - FileChecksum checksum = fs.getFileChecksum(TEST_PATH); - fs.close(); - cluster.shutdown(); - - setEncryptionConfigKeys(conf); - // It'll use 3DES by default, but we set it to rc4 here. - conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, "rc4"); - - cluster = new MiniDFSCluster.Builder(conf) - .manageDataDfsDirs(false) - .manageNameDfsDirs(false) - .format(false) - .startupOption(StartupOption.REGULAR) - .build(); - - fs = getFileSystem(conf); - LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(SaslDataTransferServer.class)); - LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(DataTransferSaslUtil.class)); - try { - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - } finally { - logs.stopCapturing(); - logs1.stopCapturing(); - } - fs.close(); + private FileChecksum writeUnencryptedAndThenRestartEncryptedCluster() + throws IOException { + cluster = new MiniDFSCluster.Builder(conf).build(); - if (resolverClazz == null) { - // Test client and server negotiate cipher option - GenericTestUtils.assertDoesNotMatch(logs.getOutput(), - "Server using cipher suite"); - // Check the IOStreamPair - GenericTestUtils.assertDoesNotMatch(logs1.getOutput(), - "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); - } - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - @Test - public void testEncryptedReadWithAES() throws IOException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - conf.set(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, - "AES/CTR/NoPadding"); - cluster = new MiniDFSCluster.Builder(conf).build(); + fs = getFileSystem(conf); + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + FileChecksum checksum = fs.getFileChecksum(TEST_PATH); + fs.close(); + cluster.shutdown(); - FileSystem fs = getFileSystem(conf); - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - FileChecksum checksum = fs.getFileChecksum(TEST_PATH); - fs.close(); - cluster.shutdown(); + setEncryptionConfigKeys(); - setEncryptionConfigKeys(conf); + cluster = new MiniDFSCluster.Builder(conf) + .manageDataDfsDirs(false) + .manageNameDfsDirs(false) + .format(false) + .startupOption(StartupOption.REGULAR) + .build(); - cluster = new MiniDFSCluster.Builder(conf) - .manageDataDfsDirs(false) - .manageNameDfsDirs(false) - .format(false) - .startupOption(StartupOption.REGULAR) - .build(); + fs = getFileSystem(conf); + return checksum; + } - fs = getFileSystem(conf); - LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(SaslDataTransferServer.class)); - LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(DataTransferSaslUtil.class)); - try { - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - } finally { - logs.stopCapturing(); - logs1.stopCapturing(); - } + public void testEncryptedRead(String algorithm, String cipherSuite, + boolean matchLog, boolean readAfterRestart) throws IOException { + // set encryption algorithm and cipher suites, but don't enable transfer + // encryption yet. + conf.set(DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY, algorithm); + conf.set(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, + cipherSuite); - fs.close(); + FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster(); + + LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( + LogFactory.getLog(SaslDataTransferServer.class)); + LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( + LogFactory.getLog(DataTransferSaslUtil.class)); + try { + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + } finally { + logs.stopCapturing(); + logs1.stopCapturing(); + } - if (resolverClazz == null) { + if (resolverClazz == null) { + if (matchLog) { // Test client and server negotiate cipher option - GenericTestUtils.assertMatches(logs.getOutput(), - "Server using cipher suite"); + GenericTestUtils + .assertMatches(logs.getOutput(), "Server using cipher suite"); // Check the IOStreamPair GenericTestUtils.assertMatches(logs1.getOutput(), "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); - } - } finally { - if (cluster != null) { - cluster.shutdown(); + } else { + // Test client and server negotiate cipher option + GenericTestUtils + .assertDoesNotMatch(logs.getOutput(), "Server using cipher suite"); + // Check the IOStreamPair + GenericTestUtils.assertDoesNotMatch(logs1.getOutput(), + "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); } } - } - @Test - public void testEncryptedReadAfterNameNodeRestart() throws IOException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).build(); - - FileSystem fs = getFileSystem(conf); - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - FileChecksum checksum = fs.getFileChecksum(TEST_PATH); - fs.close(); - cluster.shutdown(); - - setEncryptionConfigKeys(conf); - - cluster = new MiniDFSCluster.Builder(conf) - .manageDataDfsDirs(false) - .manageNameDfsDirs(false) - .format(false) - .startupOption(StartupOption.REGULAR) - .build(); - - fs = getFileSystem(conf); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - fs.close(); - + if (readAfterRestart) { cluster.restartNameNode(); fs = getFileSystem(conf); assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - fs.close(); - } finally { - if (cluster != null) { - cluster.shutdown(); - } } } + + @Test + public void testEncryptedReadDefaultAlgorithmCipherSuite() + throws IOException { + testEncryptedRead("", "", false, false); + } + + @Test + public void testEncryptedReadWithRC4() throws IOException { + testEncryptedRead("rc4", "", false, false); + } + + @Test + public void testEncryptedReadWithAES() throws IOException { + testEncryptedRead("", "AES/CTR/NoPadding", true, false); + } + + @Test + public void testEncryptedReadAfterNameNodeRestart() throws IOException { + testEncryptedRead("", "", false, true); + } @Test public void testClientThatDoesNotSupportEncryption() throws IOException { - MiniDFSCluster cluster = null; + // Set short retry timeouts so this test runs faster + conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10); + + writeUnencryptedAndThenRestartEncryptedCluster(); + + DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs); + DFSClient spyClient = Mockito.spy(client); + Mockito.doReturn(false).when(spyClient).shouldEncryptData(); + DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient); + + LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( + LogFactory.getLog(DataNode.class)); try { - Configuration conf = new Configuration(); - // Set short retry timeouts so this test runs faster - conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10); - cluster = new MiniDFSCluster.Builder(conf).build(); - - FileSystem fs = getFileSystem(conf); - writeTestDataToFile(fs); assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - fs.close(); - cluster.shutdown(); - - setEncryptionConfigKeys(conf); - - cluster = new MiniDFSCluster.Builder(conf) - .manageDataDfsDirs(false) - .manageNameDfsDirs(false) - .format(false) - .startupOption(StartupOption.REGULAR) - .build(); - - - fs = getFileSystem(conf); - DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs); - DFSClient spyClient = Mockito.spy(client); - Mockito.doReturn(false).when(spyClient).shouldEncryptData(); - DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient); - - LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(DataNode.class)); - try { - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - if (resolverClazz != null && !resolverClazz.endsWith("TestTrustedChannelResolver")){ - fail("Should not have been able to read without encryption enabled."); - } - } catch (IOException ioe) { - GenericTestUtils.assertExceptionContains("Could not obtain block:", - ioe); - } finally { - logs.stopCapturing(); - } - fs.close(); - - if (resolverClazz == null) { - GenericTestUtils.assertMatches(logs.getOutput(), - "Failed to read expected encryption handshake from client at"); + if (resolverClazz != null&& + !resolverClazz.endsWith("TestTrustedChannelResolver")){ + fail("Should not have been able to read without encryption enabled."); } + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("Could not obtain block:", + ioe); } finally { - if (cluster != null) { - cluster.shutdown(); - } + logs.stopCapturing(); + } + + if (resolverClazz == null) { + GenericTestUtils.assertMatches(logs.getOutput(), + "Failed to read expected encryption handshake from client at"); } } @Test public void testLongLivedReadClientAfterRestart() throws IOException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).build(); - - FileSystem fs = getFileSystem(conf); - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - FileChecksum checksum = fs.getFileChecksum(TEST_PATH); - fs.close(); - cluster.shutdown(); - - setEncryptionConfigKeys(conf); - - cluster = new MiniDFSCluster.Builder(conf) - .manageDataDfsDirs(false) - .manageNameDfsDirs(false) - .format(false) - .startupOption(StartupOption.REGULAR) - .build(); - - fs = getFileSystem(conf); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - - // Restart the NN and DN, after which the client's encryption key will no - // longer be valid. - cluster.restartNameNode(); - assertTrue(cluster.restartDataNode(0)); - - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - - fs.close(); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } + FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster(); + + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + + // Restart the NN and DN, after which the client's encryption key will no + // longer be valid. + cluster.restartNameNode(); + assertTrue(cluster.restartDataNode(0)); + + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); } @Test public void testLongLivedWriteClientAfterRestart() throws IOException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - setEncryptionConfigKeys(conf); - cluster = new MiniDFSCluster.Builder(conf).build(); - - FileSystem fs = getFileSystem(conf); - - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - - // Restart the NN and DN, after which the client's encryption key will no - // longer be valid. - cluster.restartNameNode(); - assertTrue(cluster.restartDataNodes()); - cluster.waitActive(); - - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - - fs.close(); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } + setEncryptionConfigKeys(); + cluster = new MiniDFSCluster.Builder(conf).build(); + + fs = getFileSystem(conf); + + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + + // Restart the NN and DN, after which the client's encryption key will no + // longer be valid. + cluster.restartNameNode(); + assertTrue(cluster.restartDataNodes()); + cluster.waitActive(); + + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); } @Test public void testLongLivedClient() throws IOException, InterruptedException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster.Builder(conf).build(); - - FileSystem fs = getFileSystem(conf); - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - FileChecksum checksum = fs.getFileChecksum(TEST_PATH); - fs.close(); - cluster.shutdown(); - - setEncryptionConfigKeys(conf); - - cluster = new MiniDFSCluster.Builder(conf) - .manageDataDfsDirs(false) - .manageNameDfsDirs(false) - .format(false) - .startupOption(StartupOption.REGULAR) - .build(); - - BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager() - .getBlockTokenSecretManager(); - btsm.setKeyUpdateIntervalForTesting(2 * 1000); - btsm.setTokenLifetime(2 * 1000); - btsm.clearAllKeysForTesting(); - - fs = getFileSystem(conf); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - - // Sleep for 15 seconds, after which the encryption key will no longer be - // valid. It needs to be a few multiples of the block token lifetime, - // since several block tokens are valid at any given time (the current - // and the last two, by default.) - LOG.info("Sleeping so that encryption keys expire..."); - Thread.sleep(15 * 1000); - LOG.info("Done sleeping."); - - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); - - fs.close(); - } finally { - if (cluster != null) { - cluster.shutdown(); - } + FileChecksum checksum = writeUnencryptedAndThenRestartEncryptedCluster(); + + BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager() + .getBlockTokenSecretManager(); + btsm.setKeyUpdateIntervalForTesting(2 * 1000); + btsm.setTokenLifetime(2 * 1000); + btsm.clearAllKeysForTesting(); + + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + + // Sleep for 15 seconds, after which the encryption key will no longer be + // valid. It needs to be a few multiples of the block token lifetime, + // since several block tokens are valid at any given time (the current + // and the last two, by default.) + LOG.info("Sleeping so that encryption keys expire..."); + Thread.sleep(15 * 1000); + LOG.info("Done sleeping."); + + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + assertEquals(checksum, fs.getFileChecksum(TEST_PATH)); + } + + @Test + public void testLongLivedClientPipelineRecovery() + throws IOException, InterruptedException, TimeoutException { + if (resolverClazz != null) { + // TestTrustedChannelResolver does not use encryption keys. + return; } + // use 4 datanodes to make sure that after 1 data node is stopped, + // client only retries establishing pipeline with the 4th node. + int numDataNodes = 4; + // do not consider load factor when selecting a data node + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, + false); + setEncryptionConfigKeys(); + + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numDataNodes) + .build(); + + fs = getFileSystem(conf); + DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs); + DFSClient spyClient = Mockito.spy(client); + DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient); + writeTestDataToFile(fs); + + BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager() + .getBlockTokenSecretManager(); + // Reduce key update interval and token life for testing. + btsm.setKeyUpdateIntervalForTesting(2 * 1000); + btsm.setTokenLifetime(2 * 1000); + btsm.clearAllKeysForTesting(); + + // Wait until the encryption key becomes invalid. + LOG.info("Wait until encryption keys become invalid..."); + + final DataEncryptionKey encryptionKey = spyClient.getEncryptionKey(); + List<DataNode> dataNodes = cluster.getDataNodes(); + for (final DataNode dn: dataNodes) { + GenericTestUtils.waitFor( + new Supplier<Boolean>() { + @Override + public Boolean get() { + return !dn.getBlockPoolTokenSecretManager(). + get(encryptionKey.blockPoolId) + .hasKey(encryptionKey.keyId); + } + }, 100, 30*1000 + ); + } + LOG.info("The encryption key is invalid on all nodes now."); + try(FSDataOutputStream out = fs.append(TEST_PATH)) { + DFSOutputStream dfstream = (DFSOutputStream) out.getWrappedStream(); + // shut down the first datanode in the pipeline. + DatanodeInfo[] targets = dfstream.getPipeline(); + cluster.stopDataNode(targets[0].getXferAddr()); + // write data to induce pipeline recovery + out.write(PLAIN_TEXT.getBytes()); + out.hflush(); + assertFalse("The first datanode in the pipeline was not replaced.", + Arrays.asList(dfstream.getPipeline()).contains(targets[0])); + } + // verify that InvalidEncryptionKeyException is handled properly + Mockito.verify(spyClient, times(1)).clearDataEncryptionKey(); } @Test @@ -497,104 +392,76 @@ public class TestEncryptedTransfer { } private void testEncryptedWrite(int numDns) throws IOException { - MiniDFSCluster cluster = null; + setEncryptionConfigKeys(); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build(); + + fs = getFileSystem(conf); + + LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( + LogFactory.getLog(SaslDataTransferServer.class)); + LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( + LogFactory.getLog(DataTransferSaslUtil.class)); try { - Configuration conf = new Configuration(); - setEncryptionConfigKeys(conf); - - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDns).build(); - - FileSystem fs = getFileSystem(conf); - - LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(SaslDataTransferServer.class)); - LogCapturer logs1 = GenericTestUtils.LogCapturer.captureLogs( - LogFactory.getLog(DataTransferSaslUtil.class)); - try { - writeTestDataToFile(fs); - } finally { - logs.stopCapturing(); - logs1.stopCapturing(); - } - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - fs.close(); - - if (resolverClazz == null) { - // Test client and server negotiate cipher option - GenericTestUtils.assertDoesNotMatch(logs.getOutput(), - "Server using cipher suite"); - // Check the IOStreamPair - GenericTestUtils.assertDoesNotMatch(logs1.getOutput(), - "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); - } + writeTestDataToFile(fs); } finally { - if (cluster != null) { - cluster.shutdown(); - } + logs.stopCapturing(); + logs1.stopCapturing(); + } + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + + if (resolverClazz == null) { + // Test client and server negotiate cipher option + GenericTestUtils.assertDoesNotMatch(logs.getOutput(), + "Server using cipher suite"); + // Check the IOStreamPair + GenericTestUtils.assertDoesNotMatch(logs1.getOutput(), + "Creating IOStreamPair of CryptoInputStream and CryptoOutputStream."); } } @Test public void testEncryptedAppend() throws IOException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - setEncryptionConfigKeys(conf); - - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); - - FileSystem fs = getFileSystem(conf); - - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - - fs.close(); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } + setEncryptionConfigKeys(); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + + fs = getFileSystem(conf); + + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); } @Test public void testEncryptedAppendRequiringBlockTransfer() throws IOException { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - setEncryptionConfigKeys(conf); - - // start up 4 DNs - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); - - FileSystem fs = getFileSystem(conf); - - // Create a file with replication 3, so its block is on 3 / 4 DNs. - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - - // Shut down one of the DNs holding a block replica. - FSDataInputStream in = fs.open(TEST_PATH); - List<LocatedBlock> locatedBlocks = DFSTestUtil.getAllBlocks(in); - in.close(); - assertEquals(1, locatedBlocks.size()); - assertEquals(3, locatedBlocks.get(0).getLocations().length); - DataNode dn = cluster.getDataNode(locatedBlocks.get(0).getLocations()[0].getIpcPort()); - dn.shutdown(); - - // Reopen the file for append, which will need to add another DN to the - // pipeline and in doing so trigger a block transfer. - writeTestDataToFile(fs); - assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); - - fs.close(); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } + setEncryptionConfigKeys(); + + // start up 4 DNs + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); + + fs = getFileSystem(conf); + + // Create a file with replication 3, so its block is on 3 / 4 DNs. + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); + + // Shut down one of the DNs holding a block replica. + FSDataInputStream in = fs.open(TEST_PATH); + List<LocatedBlock> locatedBlocks = DFSTestUtil.getAllBlocks(in); + in.close(); + assertEquals(1, locatedBlocks.size()); + assertEquals(3, locatedBlocks.get(0).getLocations().length); + DataNode dn = cluster.getDataNode( + locatedBlocks.get(0).getLocations()[0].getIpcPort()); + dn.shutdown(); + + // Reopen the file for append, which will need to add another DN to the + // pipeline and in doing so trigger a block transfer. + writeTestDataToFile(fs); + assertEquals(PLAIN_TEXT + PLAIN_TEXT, DFSTestUtil.readFile(fs, TEST_PATH)); } private static void writeTestDataToFile(FileSystem fs) throws IOException { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org