http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java index 43f2992..26ed1fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.EnumSet; import java.util.List; import java.util.Random; @@ -69,28 +68,32 @@ import org.junit.Test; public class TestBlockTokenWithDFS { - private static final int BLOCK_SIZE = 1024; - private static final int FILE_SIZE = 2 * BLOCK_SIZE; + protected static int BLOCK_SIZE = 1024; + protected static int FILE_SIZE = 2 * BLOCK_SIZE; private static final String FILE_TO_READ = "/fileToRead.dat"; private static final String FILE_TO_WRITE = "/fileToWrite.dat"; private static final String FILE_TO_APPEND = "/fileToAppend.dat"; - private final byte[] rawData = new byte[FILE_SIZE]; { ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL); + } + + public static byte[] generateBytes(int fileSize){ Random r = new Random(); + byte[] rawData = new byte[fileSize]; r.nextBytes(rawData); + return rawData; } - private void createFile(FileSystem fs, Path filename) throws IOException { + private void createFile(FileSystem fs, Path filename, byte[] expected) throws IOException { FSDataOutputStream out = fs.create(filename); - out.write(rawData); + out.write(expected); out.close(); } // read a file using blockSeekTo() - private boolean checkFile1(FSDataInputStream in) { - byte[] toRead = new byte[FILE_SIZE]; + private boolean checkFile1(FSDataInputStream in, byte[] expected) { + byte[] toRead = new byte[expected.length]; int totalRead = 0; int nRead = 0; try { @@ -101,27 +104,27 @@ public class TestBlockTokenWithDFS { return false; } assertEquals("Cannot read file.", toRead.length, totalRead); - return checkFile(toRead); + return checkFile(toRead, expected); } // read a file using fetchBlockByteRange() - private boolean checkFile2(FSDataInputStream in) { - byte[] toRead = new byte[FILE_SIZE]; + private boolean checkFile2(FSDataInputStream in, byte[] expected) { + byte[] toRead = new byte[expected.length]; try { assertEquals("Cannot read file", toRead.length, in.read(0, toRead, 0, toRead.length)); } catch (IOException e) { return false; } - return checkFile(toRead); + return checkFile(toRead, expected); } - private boolean checkFile(byte[] fileToCheck) { - if (fileToCheck.length != rawData.length) { + private boolean checkFile(byte[] fileToCheck, byte[] expected) { + if (fileToCheck.length != expected.length) { return false; } for (int i = 0; i < fileToCheck.length; i++) { - if (fileToCheck[i] != rawData[i]) { + if (fileToCheck[i] != expected[i]) { return false; } } @@ -137,7 +140,7 @@ public class TestBlockTokenWithDFS { } // try reading a block using a BlockReader directly - private static void tryRead(final Configuration conf, LocatedBlock lblock, + protected void tryRead(final Configuration conf, LocatedBlock lblock, boolean shouldSucceed) { InetSocketAddress targetAddr = null; IOException ioe = null; @@ -148,7 +151,7 @@ public class TestBlockTokenWithDFS { targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); blockReader = new BlockReaderFactory(new DfsClientConf(conf)). - setFileName(BlockReaderFactory.getFileName(targetAddr, + setFileName(BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId())). setBlock(block). setBlockToken(lblock.getBlockToken()). @@ -205,7 +208,7 @@ public class TestBlockTokenWithDFS { } // get a conf for testing - private static Configuration getConf(int numDataNodes) { + protected Configuration getConf(int numDataNodes) { Configuration conf = new Configuration(); conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); @@ -241,16 +244,16 @@ public class TestBlockTokenWithDFS { SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); Path fileToAppend = new Path(FILE_TO_APPEND); FileSystem fs = cluster.getFileSystem(); - + byte[] expected = generateBytes(FILE_SIZE); // write a one-byte file FSDataOutputStream stm = writeFile(fs, fileToAppend, (short) numDataNodes, BLOCK_SIZE); - stm.write(rawData, 0, 1); + stm.write(expected, 0, 1); stm.close(); // open the file again for append stm = fs.append(fileToAppend); - int mid = rawData.length - 1; - stm.write(rawData, 1, mid - 1); + int mid = expected.length - 1; + stm.write(expected, 1, mid - 1); stm.hflush(); /* @@ -267,11 +270,11 @@ public class TestBlockTokenWithDFS { // remove a datanode to force re-establishing pipeline cluster.stopDataNode(0); // append the rest of the file - stm.write(rawData, mid, rawData.length - mid); + stm.write(expected, mid, expected.length - mid); stm.close(); // check if append is successful FSDataInputStream in5 = fs.open(fileToAppend); - assertTrue(checkFile1(in5)); + assertTrue(checkFile1(in5, expected)); } finally { if (cluster != null) { cluster.shutdown(); @@ -303,11 +306,12 @@ public class TestBlockTokenWithDFS { Path fileToWrite = new Path(FILE_TO_WRITE); FileSystem fs = cluster.getFileSystem(); + byte[] expected = generateBytes(FILE_SIZE); FSDataOutputStream stm = writeFile(fs, fileToWrite, (short) numDataNodes, BLOCK_SIZE); // write a partial block - int mid = rawData.length - 1; - stm.write(rawData, 0, mid); + int mid = expected.length - 1; + stm.write(expected, 0, mid); stm.hflush(); /* @@ -324,11 +328,11 @@ public class TestBlockTokenWithDFS { // remove a datanode to force re-establishing pipeline cluster.stopDataNode(0); // write the rest of the file - stm.write(rawData, mid, rawData.length - mid); + stm.write(expected, mid, expected.length - mid); stm.close(); // check if write is successful FSDataInputStream in4 = fs.open(fileToWrite); - assertTrue(checkFile1(in4)); + assertTrue(checkFile1(in4, expected)); } finally { if (cluster != null) { cluster.shutdown(); @@ -346,125 +350,137 @@ public class TestBlockTokenWithDFS { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); cluster.waitActive(); assertEquals(numDataNodes, cluster.getDataNodes().size()); + doTestRead(conf, cluster, false); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } - final NameNode nn = cluster.getNameNode(); - final NamenodeProtocols nnProto = nn.getRpcServer(); - final BlockManager bm = nn.getNamesystem().getBlockManager(); - final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager(); + protected void doTestRead(Configuration conf, MiniDFSCluster cluster, + boolean isStriped) throws Exception { + final int numDataNodes = cluster.getDataNodes().size(); + final NameNode nn = cluster.getNameNode(); + final NamenodeProtocols nnProto = nn.getRpcServer(); + final BlockManager bm = nn.getNamesystem().getBlockManager(); + final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager(); - // set a short token lifetime (1 second) initially - SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); + // set a short token lifetime (1 second) initially + SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); - Path fileToRead = new Path(FILE_TO_READ); - FileSystem fs = cluster.getFileSystem(); - createFile(fs, fileToRead); + Path fileToRead = new Path(FILE_TO_READ); + FileSystem fs = cluster.getFileSystem(); + byte[] expected = generateBytes(FILE_SIZE); + createFile(fs, fileToRead, expected); /* * setup for testing expiration handling of cached tokens */ - // read using blockSeekTo(). Acquired tokens are cached in in1 - FSDataInputStream in1 = fs.open(fileToRead); - assertTrue(checkFile1(in1)); - // read using blockSeekTo(). Acquired tokens are cached in in2 - FSDataInputStream in2 = fs.open(fileToRead); - assertTrue(checkFile1(in2)); - // read using fetchBlockByteRange(). Acquired tokens are cached in in3 - FSDataInputStream in3 = fs.open(fileToRead); - assertTrue(checkFile2(in3)); + // read using blockSeekTo(). Acquired tokens are cached in in1 + FSDataInputStream in1 = fs.open(fileToRead); + assertTrue(checkFile1(in1,expected)); + // read using blockSeekTo(). Acquired tokens are cached in in2 + FSDataInputStream in2 = fs.open(fileToRead); + assertTrue(checkFile1(in2,expected)); + // read using fetchBlockByteRange(). Acquired tokens are cached in in3 + FSDataInputStream in3 = fs.open(fileToRead); + assertTrue(checkFile2(in3,expected)); /* * testing READ interface on DN using a BlockReader */ - DFSClient client = null; - try { - client = new DFSClient(new InetSocketAddress("localhost", + DFSClient client = null; + try { + client = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), conf); - } finally { - if (client != null) client.close(); - } - List<LocatedBlock> locatedBlocks = nnProto.getBlockLocations( - FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks(); - LocatedBlock lblock = locatedBlocks.get(0); // first block - Token<BlockTokenIdentifier> myToken = lblock.getBlockToken(); - // verify token is not expired - assertFalse(SecurityTestUtil.isBlockTokenExpired(myToken)); - // read with valid token, should succeed - tryRead(conf, lblock, true); + } finally { + if (client != null) client.close(); + } + List<LocatedBlock> locatedBlocks = nnProto.getBlockLocations( + FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks(); + LocatedBlock lblock = locatedBlocks.get(0); // first block + // verify token is not expired + assertFalse(isBlockTokenExpired(lblock)); + // read with valid token, should succeed + tryRead(conf, lblock, true); /* * wait till myToken and all cached tokens in in1, in2 and in3 expire */ - while (!SecurityTestUtil.isBlockTokenExpired(myToken)) { - try { - Thread.sleep(10); - } catch (InterruptedException ignored) { - } + while (!isBlockTokenExpired(lblock)) { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { } + } /* * continue testing READ interface on DN using a BlockReader */ - // verify token is expired - assertTrue(SecurityTestUtil.isBlockTokenExpired(myToken)); - // read should fail - tryRead(conf, lblock, false); - // use a valid new token - lblock.setBlockToken(sm.generateToken(lblock.getBlock(), - EnumSet.of(BlockTokenIdentifier.AccessMode.READ))); - // read should succeed - tryRead(conf, lblock, true); - // use a token with wrong blockID - ExtendedBlock wrongBlock = new ExtendedBlock(lblock.getBlock() - .getBlockPoolId(), lblock.getBlock().getBlockId() + 1); - lblock.setBlockToken(sm.generateToken(wrongBlock, - EnumSet.of(BlockTokenIdentifier.AccessMode.READ))); - // read should fail - tryRead(conf, lblock, false); - // use a token with wrong access modes - lblock.setBlockToken(sm.generateToken(lblock.getBlock(), - EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE, - BlockTokenIdentifier.AccessMode.COPY, - BlockTokenIdentifier.AccessMode.REPLACE))); - // read should fail - tryRead(conf, lblock, false); - - // set a long token lifetime for future tokens - SecurityTestUtil.setBlockTokenLifetime(sm, 600 * 1000L); + // verify token is expired + assertTrue(isBlockTokenExpired(lblock)); + // read should fail + tryRead(conf, lblock, false); + // use a valid new token + bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.READ); + // read should succeed + tryRead(conf, lblock, true); + // use a token with wrong blockID + long rightId = lblock.getBlock().getBlockId(); + long wrongId = rightId + 1; + lblock.getBlock().setBlockId(wrongId); + bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.READ); + lblock.getBlock().setBlockId(rightId); + // read should fail + tryRead(conf, lblock, false); + // use a token with wrong access modes + bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.WRITE); + // read should fail + tryRead(conf, lblock, false); + + // set a long token lifetime for future tokens + SecurityTestUtil.setBlockTokenLifetime(sm, 600 * 1000L); /* * testing that when cached tokens are expired, DFSClient will re-fetch * tokens transparently for READ. */ - // confirm all tokens cached in in1 are expired by now - List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1); - for (LocatedBlock blk : lblocks) { - assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify blockSeekTo() is able to re-fetch token transparently - in1.seek(0); - assertTrue(checkFile1(in1)); - - // confirm all tokens cached in in2 are expired by now - List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2); - for (LocatedBlock blk : lblocks2) { - assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify blockSeekTo() is able to re-fetch token transparently (testing - // via another interface method) + // confirm all tokens cached in in1 are expired by now + List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1); + for (LocatedBlock blk : lblocks) { + assertTrue(isBlockTokenExpired(blk)); + } + // verify blockSeekTo() is able to re-fetch token transparently + in1.seek(0); + assertTrue(checkFile1(in1, expected)); + + // confirm all tokens cached in in2 are expired by now + List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2); + for (LocatedBlock blk : lblocks2) { + assertTrue(isBlockTokenExpired(blk)); + } + // verify blockSeekTo() is able to re-fetch token transparently (testing + // via another interface method) + if (isStriped) { + // striped block doesn't support seekToNewSource + in2.seek(0); + } else { assertTrue(in2.seekToNewSource(0)); - assertTrue(checkFile1(in2)); + } + assertTrue(checkFile1(in2,expected)); - // confirm all tokens cached in in3 are expired by now - List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3); - for (LocatedBlock blk : lblocks3) { - assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify fetchBlockByteRange() is able to re-fetch token transparently - assertTrue(checkFile2(in3)); + // confirm all tokens cached in in3 are expired by now + List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3); + for (LocatedBlock blk : lblocks3) { + assertTrue(isBlockTokenExpired(blk)); + } + // verify fetchBlockByteRange() is able to re-fetch token transparently + assertTrue(checkFile2(in3,expected)); /* * testing that after datanodes are restarted on the same ports, cached @@ -473,37 +489,42 @@ public class TestBlockTokenWithDFS { * new tokens can be fetched from namenode). */ - // restart datanodes on the same ports that they currently use - assertTrue(cluster.restartDataNodes(true)); - cluster.waitActive(); - assertEquals(numDataNodes, cluster.getDataNodes().size()); - cluster.shutdownNameNode(0); + // restart datanodes on the same ports that they currently use + assertTrue(cluster.restartDataNodes(true)); + cluster.waitActive(); + assertEquals(numDataNodes, cluster.getDataNodes().size()); + cluster.shutdownNameNode(0); - // confirm tokens cached in in1 are still valid - lblocks = DFSTestUtil.getAllBlocks(in1); - for (LocatedBlock blk : lblocks) { - assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify blockSeekTo() still works (forced to use cached tokens) - in1.seek(0); - assertTrue(checkFile1(in1)); - - // confirm tokens cached in in2 are still valid - lblocks2 = DFSTestUtil.getAllBlocks(in2); - for (LocatedBlock blk : lblocks2) { - assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify blockSeekTo() still works (forced to use cached tokens) + // confirm tokens cached in in1 are still valid + lblocks = DFSTestUtil.getAllBlocks(in1); + for (LocatedBlock blk : lblocks) { + assertFalse(isBlockTokenExpired(blk)); + } + // verify blockSeekTo() still works (forced to use cached tokens) + in1.seek(0); + assertTrue(checkFile1(in1,expected)); + + // confirm tokens cached in in2 are still valid + lblocks2 = DFSTestUtil.getAllBlocks(in2); + for (LocatedBlock blk : lblocks2) { + assertFalse(isBlockTokenExpired(blk)); + } + + // verify blockSeekTo() still works (forced to use cached tokens) + if (isStriped) { + in2.seek(0); + } else { in2.seekToNewSource(0); - assertTrue(checkFile1(in2)); + } + assertTrue(checkFile1(in2,expected)); - // confirm tokens cached in in3 are still valid - lblocks3 = DFSTestUtil.getAllBlocks(in3); - for (LocatedBlock blk : lblocks3) { - assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify fetchBlockByteRange() still works (forced to use cached tokens) - assertTrue(checkFile2(in3)); + // confirm tokens cached in in3 are still valid + lblocks3 = DFSTestUtil.getAllBlocks(in3); + for (LocatedBlock blk : lblocks3) { + assertFalse(isBlockTokenExpired(blk)); + } + // verify fetchBlockByteRange() still works (forced to use cached tokens) + assertTrue(checkFile2(in3,expected)); /* * testing that when namenode is restarted, cached tokens should still @@ -512,18 +533,23 @@ public class TestBlockTokenWithDFS { * setup for this test depends on the previous test. */ - // restart the namenode and then shut it down for test - cluster.restartNameNode(0); - cluster.shutdownNameNode(0); + // restart the namenode and then shut it down for test + cluster.restartNameNode(0); + cluster.shutdownNameNode(0); - // verify blockSeekTo() still works (forced to use cached tokens) - in1.seek(0); - assertTrue(checkFile1(in1)); - // verify again blockSeekTo() still works (forced to use cached tokens) + // verify blockSeekTo() still works (forced to use cached tokens) + in1.seek(0); + assertTrue(checkFile1(in1,expected)); + // verify again blockSeekTo() still works (forced to use cached tokens) + if (isStriped) { + in2.seek(0); + } else { in2.seekToNewSource(0); - assertTrue(checkFile1(in2)); - // verify fetchBlockByteRange() still works (forced to use cached tokens) - assertTrue(checkFile2(in3)); + } + assertTrue(checkFile1(in2,expected)); + + // verify fetchBlockByteRange() still works (forced to use cached tokens) + assertTrue(checkFile2(in3,expected)); /* * testing that after both namenode and datanodes got restarted (namenode @@ -532,58 +558,60 @@ public class TestBlockTokenWithDFS { * setup of this test depends on the previous test. */ - // restore the cluster and restart the datanodes for test - cluster.restartNameNode(0); - assertTrue(cluster.restartDataNodes(true)); - cluster.waitActive(); - assertEquals(numDataNodes, cluster.getDataNodes().size()); - - // shutdown namenode so that DFSClient can't get new tokens from namenode - cluster.shutdownNameNode(0); - - // verify blockSeekTo() fails (cached tokens become invalid) - in1.seek(0); - assertFalse(checkFile1(in1)); - // verify fetchBlockByteRange() fails (cached tokens become invalid) - assertFalse(checkFile2(in3)); - - // restart the namenode to allow DFSClient to re-fetch tokens - cluster.restartNameNode(0); - // verify blockSeekTo() works again (by transparently re-fetching - // tokens from namenode) - in1.seek(0); - assertTrue(checkFile1(in1)); + // restore the cluster and restart the datanodes for test + cluster.restartNameNode(0); + assertTrue(cluster.restartDataNodes(true)); + cluster.waitActive(); + assertEquals(numDataNodes, cluster.getDataNodes().size()); + + // shutdown namenode so that DFSClient can't get new tokens from namenode + cluster.shutdownNameNode(0); + + // verify blockSeekTo() fails (cached tokens become invalid) + in1.seek(0); + assertFalse(checkFile1(in1,expected)); + // verify fetchBlockByteRange() fails (cached tokens become invalid) + assertFalse(checkFile2(in3,expected)); + + // restart the namenode to allow DFSClient to re-fetch tokens + cluster.restartNameNode(0); + // verify blockSeekTo() works again (by transparently re-fetching + // tokens from namenode) + in1.seek(0); + assertTrue(checkFile1(in1,expected)); + if (isStriped) { + in2.seek(0); + } else { in2.seekToNewSource(0); - assertTrue(checkFile1(in2)); - // verify fetchBlockByteRange() works again (by transparently - // re-fetching tokens from namenode) - assertTrue(checkFile2(in3)); + } + assertTrue(checkFile1(in2,expected)); + // verify fetchBlockByteRange() works again (by transparently + // re-fetching tokens from namenode) + assertTrue(checkFile2(in3,expected)); /* * testing that when datanodes are restarted on different ports, DFSClient * is able to re-fetch tokens transparently to connect to them */ - // restart datanodes on newly assigned ports - assertTrue(cluster.restartDataNodes(false)); - cluster.waitActive(); - assertEquals(numDataNodes, cluster.getDataNodes().size()); - // verify blockSeekTo() is able to re-fetch token transparently - in1.seek(0); - assertTrue(checkFile1(in1)); - // verify blockSeekTo() is able to re-fetch token transparently + // restart datanodes on newly assigned ports + assertTrue(cluster.restartDataNodes(false)); + cluster.waitActive(); + assertEquals(numDataNodes, cluster.getDataNodes().size()); + // verify blockSeekTo() is able to re-fetch token transparently + in1.seek(0); + assertTrue(checkFile1(in1,expected)); + // verify blockSeekTo() is able to re-fetch token transparently + if (isStriped) { + in2.seek(0); + } else { in2.seekToNewSource(0); - assertTrue(checkFile1(in2)); - // verify fetchBlockByteRange() is able to re-fetch token transparently - assertTrue(checkFile2(in3)); - - } finally { - if (cluster != null) { - cluster.shutdown(); - } } - } + assertTrue(checkFile1(in2,expected)); + // verify fetchBlockByteRange() is able to re-fetch token transparently + assertTrue(checkFile2(in3,expected)); + } /** * Integration testing of access token, involving NN, DN, and Balancer */ @@ -593,4 +621,8 @@ public class TestBlockTokenWithDFS { conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); new TestBalancer().integrationTest(conf); } + + protected boolean isBlockTokenExpired(LocatedBlock lb) throws IOException { + return SecurityTestUtil.isBlockTokenExpired(lb.getBlockToken()); + } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java new file mode 100644 index 0000000..f985f54 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.balancer.TestBalancer; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.junit.Test; + +import java.io.IOException; + +public class TestBlockTokenWithDFSStriped extends TestBlockTokenWithDFS { + + private final static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + private final static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; + private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final static int stripesPerBlock = 4; + private final static int numDNs = dataBlocks + parityBlocks + 2; + private static MiniDFSCluster cluster; + private static Configuration conf; + + { + BLOCK_SIZE = cellSize * stripesPerBlock; + FILE_SIZE = BLOCK_SIZE * dataBlocks * 3; + } + + private Configuration getConf() { + Configuration conf = super.getConf(numDNs); + conf.setInt("io.bytes.per.checksum", cellSize); + return conf; + } + + @Test + @Override + public void testRead() throws Exception { + conf = getConf(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.getFileSystem().getClient() + .createErasureCodingZone("/", null, cellSize); + try { + cluster.waitActive(); + doTestRead(conf, cluster, true); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * tested at {@link org.apache.hadoop.hdfs.TestDFSStripedOutputStreamWithFailure#testBlockTokenExpired()} + */ + @Test + @Override + public void testWrite(){ + } + + @Test + @Override + public void testAppend() throws Exception { + //TODO: support Append for striped file + } + + @Test + @Override + public void testEnd2End() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + new TestBalancer().integrationTestWithStripedFile(conf); + } + + @Override + protected void tryRead(final Configuration conf, LocatedBlock lblock, + boolean shouldSucceed) { + LocatedStripedBlock lsb = (LocatedStripedBlock) lblock; + LocatedBlock[] internalBlocks = StripedBlockUtil.parseStripedBlockGroup + (lsb, cellSize, dataBlocks, parityBlocks); + for (LocatedBlock internalBlock : internalBlocks) { + super.tryRead(conf, internalBlock, shouldSucceed); + } + } + + @Override + protected boolean isBlockTokenExpired(LocatedBlock lb) throws IOException { + LocatedStripedBlock lsb = (LocatedStripedBlock) lb; + LocatedBlock[] internalBlocks = StripedBlockUtil.parseStripedBlockGroup + (lsb, cellSize, dataBlocks, parityBlocks); + for (LocatedBlock internalBlock : internalBlocks) { + if(super.isBlockTokenExpired(internalBlock)){ + return true; + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java index 6fc30ba..c1218a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; -import org.apache.hadoop.util.Time; import org.junit.Test; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index cea6865..b11b48a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -185,9 +185,12 @@ public class TestNameNodePrunesMissingStorages { String datanodeUuid; // Find the first storage which this block is in. try { + BlockInfo storedBlock = + cluster.getNamesystem().getBlockManager(). + getStoredBlock(block.getLocalBlock()); Iterator<DatanodeStorageInfo> storageInfoIter = cluster.getNamesystem().getBlockManager(). - getStorages(block.getLocalBlock()).iterator(); + blocksMap.getStorages(storedBlock).iterator(); assertTrue(storageInfoIter.hasNext()); DatanodeStorageInfo info = storageInfoIter.next(); storageIdToRemove = info.getStorageID(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java index 1c3f075..c33667d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java @@ -100,7 +100,7 @@ public class TestNodeCount { DatanodeDescriptor nonExcessDN = null; for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) { final DatanodeDescriptor dn = storage.getDatanodeDescriptor(); - Collection<Block> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid()); + Collection<BlockInfo> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid()); if (blocks == null || !blocks.contains(block.getLocalBlock()) ) { nonExcessDN = dn; break; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java index 2d7bb44..83b3aa0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -42,7 +41,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; -import org.apache.hadoop.util.Time; import org.junit.Test; public class TestOverReplicatedBlocks { @@ -185,7 +183,7 @@ public class TestOverReplicatedBlocks { // All replicas for deletion should be scheduled on lastDN. // And should not actually be deleted, because lastDN does not heartbeat. namesystem.readLock(); - Collection<Block> dnBlocks = + Collection<BlockInfo> dnBlocks = namesystem.getBlockManager().excessReplicateMap.get(lastDNid); assertEquals("Replicas on node " + lastDNid + " should have been deleted", SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 6553185..f5af898 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -1208,8 +1208,17 @@ public class TestReplicationPolicy { BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; - BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong()); - BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong()); + long blkID1 = ThreadLocalRandom.current().nextLong(); + if (blkID1 < 0) { + blkID1 *= -1; + } + long blkID2 = ThreadLocalRandom.current().nextLong(); + if (blkID2 < 0) { + blkID2 *= -1; + } + + BlockInfo block1 = genBlockInfo(blkID1); + BlockInfo block2 = genBlockInfo(blkID2); // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block1, 0, 1, 1); @@ -1224,7 +1233,7 @@ public class TestReplicationPolicy { chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); - final BlockInfo info = new BlockInfoContiguous(block1, (short) 1); + final BlockInfoContiguous info = new BlockInfoContiguous(block1, (short) 1); final BlockCollection mbc = mock(BlockCollection.class); when(mbc.getLastBlock()).thenReturn(info); when(mbc.getPreferredBlockSize()).thenReturn(block1.getNumBytes() + 1); @@ -1247,12 +1256,12 @@ public class TestReplicationPolicy { when(storage.getState()).thenReturn(DatanodeStorage.State.NORMAL); when(storage.getDatanodeDescriptor()).thenReturn(dn); when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true); - when(storage.addBlock(any(BlockInfo.class))).thenReturn + when(storage.addBlock(any(BlockInfoContiguous.class))).thenReturn (DatanodeStorageInfo.AddBlockResult.ADDED); - ucBlock.addStorage(storage); + ucBlock.addStorage(storage, ucBlock); - when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any())) - .thenReturn(ucBlock); + BlockInfo lastBlk = mbc.getLastBlock(); + when(mbc.getLastBlock()).thenReturn(lastBlk, ucBlock); bm.convertLastBlockToUnderConstruction(mbc, 0L); @@ -1287,7 +1296,7 @@ public class TestReplicationPolicy { chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); - bm.setReplication((short)0, (short)1, "", block1); + bm.setReplication((short)0, (short)1, "", (BlockInfoContiguous) block1); // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSequentialBlockGroupId.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSequentialBlockGroupId.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSequentialBlockGroupId.java new file mode 100644 index 0000000..2f2356f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSequentialBlockGroupId.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BLOCK_GROUP_INDEX_MASK; +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_BLOCKS_IN_GROUP; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; +import org.mockito.stubbing.Answer; + +/** + * Tests the sequential blockGroup ID generation mechanism and blockGroup ID + * collision handling. + */ +public class TestSequentialBlockGroupId { + private static final Log LOG = LogFactory + .getLog("TestSequentialBlockGroupId"); + + private final short REPLICATION = 1; + private final long SEED = 0; + private final int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + private final int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; + private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + + private final int stripesPerBlock = 2; + private final int blockSize = cellSize * stripesPerBlock; + private final int numDNs = dataBlocks + parityBlocks + 2; + private final int blockGrpCount = 4; + private final int fileLen = blockSize * dataBlocks * blockGrpCount; + + private MiniDFSCluster cluster; + private FileSystem fs; + private SequentialBlockGroupIdGenerator blockGrpIdGenerator; + private Path eczone = new Path("/eczone"); + + @Before + public void setup() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.waitActive(); + + fs = cluster.getFileSystem(); + blockGrpIdGenerator = cluster.getNamesystem().getBlockIdManager() + .getBlockGroupIdGenerator(); + fs.mkdirs(eczone); + cluster.getFileSystem().getClient() + .createErasureCodingZone("/eczone", null, cellSize); + } + + @After + public void teardown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Test that blockGroup IDs are generating unique value. + */ + @Test(timeout = 60000) + public void testBlockGroupIdGeneration() throws IOException { + long blockGroupIdInitialValue = blockGrpIdGenerator.getCurrentValue(); + + // Create a file that is 4 blocks long. + Path path = new Path(eczone, "testBlockGrpIdGeneration.dat"); + DFSTestUtil.createFile(fs, path, cellSize, fileLen, blockSize, REPLICATION, + SEED); + List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs, path); + assertThat("Wrong BlockGrps", blocks.size(), is(blockGrpCount)); + + // initialising the block group generator for verifying the block id + blockGrpIdGenerator.setCurrentValue(blockGroupIdInitialValue); + // Ensure that the block IDs are generating unique value. + for (int i = 0; i < blocks.size(); ++i) { + blockGrpIdGenerator + .skipTo((blockGrpIdGenerator.getCurrentValue() & ~BLOCK_GROUP_INDEX_MASK) + + MAX_BLOCKS_IN_GROUP); + long nextBlockExpectedId = blockGrpIdGenerator.getCurrentValue(); + long nextBlockGrpId = blocks.get(i).getBlock().getBlockId(); + LOG.info("BlockGrp" + i + " id is " + nextBlockGrpId); + assertThat("BlockGrpId mismatches!", nextBlockGrpId, + is(nextBlockExpectedId)); + } + } + + /** + * Test that collisions in the blockGroup ID space are handled gracefully. + */ + @Test(timeout = 60000) + public void testTriggerBlockGroupIdCollision() throws IOException { + long blockGroupIdInitialValue = blockGrpIdGenerator.getCurrentValue(); + + // Create a file with a few blocks to rev up the global block ID + // counter. + Path path1 = new Path(eczone, "testBlockGrpIdCollisionDetection_file1.dat"); + DFSTestUtil.createFile(fs, path1, cellSize, fileLen, blockSize, + REPLICATION, SEED); + List<LocatedBlock> blocks1 = DFSTestUtil.getAllBlocks(fs, path1); + assertThat("Wrong BlockGrps", blocks1.size(), is(blockGrpCount)); + + // Rewind the block ID counter in the name system object. This will result + // in block ID collisions when we try to allocate new blocks. + blockGrpIdGenerator.setCurrentValue(blockGroupIdInitialValue); + + // Trigger collisions by creating a new file. + Path path2 = new Path(eczone, "testBlockGrpIdCollisionDetection_file2.dat"); + DFSTestUtil.createFile(fs, path2, cellSize, fileLen, blockSize, + REPLICATION, SEED); + List<LocatedBlock> blocks2 = DFSTestUtil.getAllBlocks(fs, path2); + assertThat("Wrong BlockGrps", blocks2.size(), is(blockGrpCount)); + + // Make sure that file1 and file2 block IDs are different + for (LocatedBlock locBlock1 : blocks1) { + long blockId1 = locBlock1.getBlock().getBlockId(); + for (LocatedBlock locBlock2 : blocks2) { + long blockId2 = locBlock2.getBlock().getBlockId(); + assertThat("BlockGrpId mismatches!", blockId1, is(not(blockId2))); + } + } + } + + /** + * Test that collisions in the blockGroup ID when the id is occupied by legacy + * block. + */ + @Test(timeout = 60000) + public void testTriggerBlockGroupIdCollisionWithLegacyBlockId() + throws Exception { + long blockGroupIdInitialValue = blockGrpIdGenerator.getCurrentValue(); + blockGrpIdGenerator + .skipTo((blockGrpIdGenerator.getCurrentValue() & ~BLOCK_GROUP_INDEX_MASK) + + MAX_BLOCKS_IN_GROUP); + final long curBlockGroupIdValue = blockGrpIdGenerator.getCurrentValue(); + + // Creates contiguous block with negative blockId so that it would trigger + // collision during blockGroup Id generation + FSNamesystem fsn = cluster.getNamesystem(); + // Replace SequentialBlockIdGenerator with a spy + SequentialBlockIdGenerator blockIdGenerator = spy(fsn.getBlockIdManager() + .getBlockIdGenerator()); + Whitebox.setInternalState(fsn.getBlockIdManager(), "blockIdGenerator", + blockIdGenerator); + SequentialBlockIdGenerator spySequentialBlockIdGenerator = new SequentialBlockIdGenerator( + null) { + @Override + public long nextValue() { + return curBlockGroupIdValue; + } + }; + final Answer<Object> delegator = new GenericTestUtils.DelegateAnswer( + spySequentialBlockIdGenerator); + doAnswer(delegator).when(blockIdGenerator).nextValue(); + + Path path1 = new Path("/testCollisionWithLegacyBlock_file1.dat"); + DFSTestUtil.createFile(fs, path1, 1024, REPLICATION, SEED); + + List<LocatedBlock> contiguousBlocks = DFSTestUtil.getAllBlocks(fs, path1); + assertThat(contiguousBlocks.size(), is(1)); + Assert.assertEquals("Unexpected BlockId!", curBlockGroupIdValue, + contiguousBlocks.get(0).getBlock().getBlockId()); + + // Reset back to the initial value to trigger collision + blockGrpIdGenerator.setCurrentValue(blockGroupIdInitialValue); + // Trigger collisions by creating a new file. + Path path2 = new Path(eczone, "testCollisionWithLegacyBlock_file2.dat"); + DFSTestUtil.createFile(fs, path2, cellSize, fileLen, blockSize, + REPLICATION, SEED); + List<LocatedBlock> blocks2 = DFSTestUtil.getAllBlocks(fs, path2); + assertThat("Wrong BlockGrps", blocks2.size(), is(blockGrpCount)); + + // Make sure that file1 and file2 block IDs are different + for (LocatedBlock locBlock1 : contiguousBlocks) { + long blockId1 = locBlock1.getBlock().getBlockId(); + for (LocatedBlock locBlock2 : blocks2) { + long blockId2 = locBlock2.getBlock().getBlockId(); + assertThat("BlockGrpId mismatches!", blockId1, is(not(blockId2))); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java index de36e07..0f419ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -28,10 +31,21 @@ import static org.junit.Assert.fail; public class TestUnderReplicatedBlockQueues { + private final ECSchema ecSchema = + ErasureCodingSchemaManager.getSystemDefaultSchema(); + private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private BlockInfo genBlockInfo(long id) { return new BlockInfoContiguous(new Block(id), (short) 3); } + private BlockInfo genStripedBlockInfo(long id, long numBytes) { + BlockInfoStriped sblk = new BlockInfoStriped(new Block(id), ecSchema, + CELLSIZE); + sblk.setNumBytes(numBytes); + return sblk; + } + /** * Test that adding blocks with different replication counts puts them * into different queues @@ -85,6 +99,54 @@ public class TestUnderReplicatedBlockQueues { assertEquals(2, queues.getCorruptReplOneBlockSize()); } + @Test + public void testStripedBlockPriorities() throws Throwable { + int dataBlkNum = ecSchema.getNumDataUnits(); + int parityBlkNUm = ecSchema.getNumParityUnits(); + doTestStripedBlockPriorities(1, parityBlkNUm); + doTestStripedBlockPriorities(dataBlkNum, parityBlkNUm); + } + + private void doTestStripedBlockPriorities(int dataBlkNum, int parityBlkNum) + throws Throwable { + int groupSize = dataBlkNum + parityBlkNum; + long numBytes = CELLSIZE * dataBlkNum; + UnderReplicatedBlocks queues = new UnderReplicatedBlocks(); + + // add a striped block which been left NUM_DATA_BLOCKS internal blocks + BlockInfo block1 = genStripedBlockInfo(-100, numBytes); + assertAdded(queues, block1, dataBlkNum, 0, groupSize); + assertEquals(1, queues.getUnderReplicatedBlockCount()); + assertEquals(1, queues.size()); + assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY); + + // add a striped block which been left NUM_DATA_BLOCKS+1 internal blocks + BlockInfo block2 = genStripedBlockInfo(-200, numBytes); + assertAdded(queues, block2, dataBlkNum + 1, 0, groupSize); + assertEquals(2, queues.getUnderReplicatedBlockCount()); + assertEquals(2, queues.size()); + assertInLevel(queues, block2, + UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED); + + // add a striped block which been left NUM_DATA_BLOCKS+2 internal blocks + BlockInfo block3 = genStripedBlockInfo(-300, numBytes); + assertAdded(queues, block3, dataBlkNum + 2, 0, groupSize); + assertEquals(3, queues.getUnderReplicatedBlockCount()); + assertEquals(3, queues.size()); + assertInLevel(queues, block3, + UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED); + + // add a corrupted block + BlockInfo block_corrupt = genStripedBlockInfo(-400, numBytes); + assertEquals(0, queues.getCorruptBlockSize()); + assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize); + assertEquals(4, queues.size()); + assertEquals(3, queues.getUnderReplicatedBlockCount()); + assertEquals(1, queues.getCorruptBlockSize()); + assertInLevel(queues, block_corrupt, + UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS); + } + private void assertAdded(UnderReplicatedBlocks queues, BlockInfo block, int curReplicas, http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 6e5f07c..82b77c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -100,7 +100,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { public static byte simulatedByte(Block b, long offsetInBlk) { byte firstByte = (byte) (b.getBlockId() & BYTE_MASK); - return (byte) ((firstByte + offsetInBlk) & BYTE_MASK); + return (byte) ((firstByte + offsetInBlk % 29) & BYTE_MASK); } public static final String CONFIG_PROPERTY_CAPACITY = http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java index 989e216..d8c651f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java @@ -227,15 +227,6 @@ public class TestIncrementalBrVariations { return new Block(10000000L, 100L, 1048576L); } - private static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock( - Block block, DatanodeStorage storage) { - ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1]; - receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, BlockStatus.RECEIVED_BLOCK, null); - StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1]; - reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks); - return reports; - } - /** * Verify that the NameNode can learn about new storages from incremental * block reports. @@ -251,8 +242,9 @@ public class TestIncrementalBrVariations { // Generate a report for a fake block on a fake storage. final String newStorageUuid = UUID.randomUUID().toString(); final DatanodeStorage newStorage = new DatanodeStorage(newStorageUuid); - StorageReceivedDeletedBlocks[] reports = makeReportForReceivedBlock( - getDummyBlock(), newStorage); + StorageReceivedDeletedBlocks[] reports = DFSTestUtil. + makeReportForReceivedBlock(getDummyBlock(), BlockStatus.RECEIVED_BLOCK, + newStorage); // Send the report to the NN. cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index d3d814c..14503b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DFSTestUtil; @@ -34,11 +35,17 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock; import org.apache.hadoop.hdfs.server.balancer.ExitStatus; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.balancer.TestBalancer; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.mover.Mover.MLocation; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.test.GenericTestUtils; @@ -99,7 +106,7 @@ public class TestMover { final LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); final List<MLocation> locations = MLocation.toLocations(lb); final MLocation ml = locations.get(0); - final DBlock db = mover.newDBlock(lb.getBlock().getLocalBlock(), locations); + final DBlock db = mover.newDBlock(lb, locations, null); final List<StorageType> storageTypes = new ArrayList<StorageType>( Arrays.asList(StorageType.DEFAULT, StorageType.DEFAULT)); @@ -409,4 +416,120 @@ public class TestMover { cluster.shutdown(); } } + + int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; + private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final static int stripesPerBlock = 4; + static int DEFAULT_STRIPE_BLOCK_SIZE = cellSize * stripesPerBlock; + + static void initConfWithStripe(Configuration conf) { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_STRIPE_BLOCK_SIZE); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); + Dispatcher.setBlockMoveWaitTime(3000L); + } + + @Test(timeout = 300000) + public void testMoverWithStripedFile() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConfWithStripe(conf); + + // start 10 datanodes + int numOfDatanodes =10; + int storagesPerDatanode=2; + long capacity = 10 * DEFAULT_STRIPE_BLOCK_SIZE; + long[][] capacities = new long[numOfDatanodes][storagesPerDatanode]; + for (int i = 0; i < numOfDatanodes; i++) { + for(int j=0;j<storagesPerDatanode;j++){ + capacities[i][j]=capacity; + } + } + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numOfDatanodes) + .storagesPerDatanode(storagesPerDatanode) + .storageTypes(new StorageType[][]{ + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}}) + .storageCapacities(capacities) + .build(); + + try { + cluster.waitActive(); + + // set "/bar" directory with HOT storage policy. + ClientProtocol client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); + String barDir = "/bar"; + client.mkdirs(barDir, new FsPermission((short) 777), true); + client.setStoragePolicy(barDir, + HdfsConstants.HOT_STORAGE_POLICY_NAME); + // set "/bar" directory with EC zone. + client.createErasureCodingZone(barDir, null, 0); + + // write file to barDir + final String fooFile = "/bar/foo"; + long fileLen = 20 * DEFAULT_STRIPE_BLOCK_SIZE ; + DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile), + fileLen,(short) 3, 0); + + // verify storage types and locations + LocatedBlocks locatedBlocks = + client.getBlockLocations(fooFile, 0, fileLen); + for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){ + for( StorageType type : lb.getStorageTypes()){ + Assert.assertEquals(StorageType.DISK, type); + } + } + DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, + dataBlocks + parityBlocks); + + // start 5 more datanodes + numOfDatanodes +=5; + capacities = new long[5][storagesPerDatanode]; + for (int i = 0; i < 5; i++) { + for(int j=0;j<storagesPerDatanode;j++){ + capacities[i][j]=capacity; + } + } + cluster.startDataNodes(conf, 5, + new StorageType[][]{ + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}}, + true, null, null, null,capacities, null, false, false, false, null); + cluster.triggerHeartbeats(); + + // move file to ARCHIVE + client.setStoragePolicy(barDir, "COLD"); + // run Mover + int rc = ToolRunner.run(conf, new Mover.Cli(), + new String[] { "-p", barDir }); + Assert.assertEquals("Movement to ARCHIVE should be successfull", 0, rc); + + // verify storage types and locations + locatedBlocks = client.getBlockLocations(fooFile, 0, fileLen); + for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){ + for( StorageType type : lb.getStorageTypes()){ + Assert.assertEquals(StorageType.ARCHIVE, type); + } + } + DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, + dataBlocks + parityBlocks); + + }finally{ + cluster.shutdown(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index b314584..a2cb434 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -71,8 +71,13 @@ public class NameNodeAdapter { public static HdfsFileStatus getFileInfo(NameNode namenode, String src, boolean resolveLink) throws AccessControlException, UnresolvedLinkException, StandbyException, IOException { - return FSDirStatAndListingOp.getFileInfo(namenode.getNamesystem() - .getFSDirectory(), src, resolveLink); + namenode.getNamesystem().readLock(); + try { + return FSDirStatAndListingOp.getFileInfo(namenode.getNamesystem() + .getFSDirectory(), src, resolveLink); + } finally { + namenode.getNamesystem().readUnlock(); + } } public static boolean mkdirs(NameNode namenode, String src, http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java new file mode 100644 index 0000000..337911d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestAddOverReplicatedStripedBlocks { + + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + private final Path dirPath = new Path("/striped"); + private Path filePath = new Path(dirPath, "file"); + private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS; + private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS; + private final short GROUP_SIZE = DATA_BLK_NUM + PARITY_BLK_NUM; + private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final int NUM_STRIPE_PER_BLOCK = 4; + private final int BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE; + private final int numDNs = GROUP_SIZE + 3; + + @Before + public void setup() throws IOException { + Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + // disable block recovery + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); + SimulatedFSDataset.setFactory(conf); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + fs.mkdirs(dirPath); + fs.getClient().createErasureCodingZone(dirPath.toString(), null, CELLSIZE); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testProcessOverReplicatedStripedBlock() throws Exception { + // create a file which has exact one block group to the first GROUP_SIZE DNs + long fileLen = DATA_BLK_NUM * BLOCK_SIZE; + DFSTestUtil.createStripedFile(cluster, filePath, null, 1, + NUM_STRIPE_PER_BLOCK, false); + LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, fileLen); + LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0)); + long gs = bg.getBlock().getGenerationStamp(); + String bpid = bg.getBlock().getBlockPoolId(); + long groupId = bg.getBlock().getBlockId(); + Block blk = new Block(groupId, BLOCK_SIZE, gs); + for (int i = 0; i < GROUP_SIZE; i++) { + blk.setBlockId(groupId + i); + cluster.injectBlocks(i, Arrays.asList(blk), bpid); + } + cluster.triggerBlockReports(); + + // let a internal block be over replicated with 2 redundant blocks. + blk.setBlockId(groupId + 2); + cluster.injectBlocks(numDNs - 3, Arrays.asList(blk), bpid); + cluster.injectBlocks(numDNs - 2, Arrays.asList(blk), bpid); + // let a internal block be over replicated with 1 redundant block. + blk.setBlockId(groupId + 6); + cluster.injectBlocks(numDNs - 1, Arrays.asList(blk), bpid); + + // update blocksMap + cluster.triggerBlockReports(); + // add to invalidates + cluster.triggerHeartbeats(); + // datanode delete block + cluster.triggerHeartbeats(); + // update blocksMap + cluster.triggerBlockReports(); + + // verify that all internal blocks exists + lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, fileLen); + DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE); + } + + @Test + public void testProcessOverReplicatedSBSmallerThanFullBlocks() + throws Exception { + // Create a EC file which doesn't fill full internal blocks. + int fileLen = CELLSIZE * (DATA_BLK_NUM - 1); + byte[] content = new byte[fileLen]; + DFSTestUtil.writeFile(fs, filePath, new String(content)); + LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, fileLen); + LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0)); + long gs = bg.getBlock().getGenerationStamp(); + String bpid = bg.getBlock().getBlockPoolId(); + long groupId = bg.getBlock().getBlockId(); + Block blk = new Block(groupId, BLOCK_SIZE, gs); + cluster.triggerBlockReports(); + List<DatanodeInfo> infos = Arrays.asList(bg.getLocations()); + + // let a internal block be over replicated with 2 redundant blocks. + // Therefor number of internal blocks is over GROUP_SIZE. (5 data blocks + + // 3 parity blocks + 2 redundant blocks > GROUP_SIZE) + blk.setBlockId(groupId + 2); + List<DataNode> dataNodeList = cluster.getDataNodes(); + for (int i = 0; i < numDNs; i++) { + if (!infos.contains(dataNodeList.get(i).getDatanodeId())) { + cluster.injectBlocks(i, Arrays.asList(blk), bpid); + System.out.println("XXX: inject block into datanode " + i); + } + } + + // update blocksMap + cluster.triggerBlockReports(); + // add to invalidates + cluster.triggerHeartbeats(); + // datanode delete block + cluster.triggerHeartbeats(); + // update blocksMap + cluster.triggerBlockReports(); + + // verify that all internal blocks exists + lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, fileLen); + DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1); + } + + @Test + public void testProcessOverReplicatedAndCorruptStripedBlock() + throws Exception { + long fileLen = DATA_BLK_NUM * BLOCK_SIZE; + DFSTestUtil.createStripedFile(cluster, filePath, null, 1, + NUM_STRIPE_PER_BLOCK, false); + LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, fileLen); + LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0)); + long gs = bg.getBlock().getGenerationStamp(); + String bpid = bg.getBlock().getBlockPoolId(); + long groupId = bg.getBlock().getBlockId(); + Block blk = new Block(groupId, BLOCK_SIZE, gs); + BlockInfoStriped blockInfo = new BlockInfoStriped(blk, + ErasureCodingSchemaManager.getSystemDefaultSchema(), CELLSIZE); + for (int i = 0; i < GROUP_SIZE; i++) { + blk.setBlockId(groupId + i); + cluster.injectBlocks(i, Arrays.asList(blk), bpid); + } + cluster.triggerBlockReports(); + + // let a internal block be corrupt + BlockManager bm = cluster.getNamesystem().getBlockManager(); + List<DatanodeInfo> infos = Arrays.asList(bg.getLocations()); + List<String> storages = Arrays.asList(bg.getStorageIDs()); + cluster.getNamesystem().writeLock(); + try { + bm.findAndMarkBlockAsCorrupt(lbs.getLastLocatedBlock().getBlock(), + infos.get(0), storages.get(0), "TEST"); + } finally { + cluster.getNamesystem().writeUnlock(); + } + assertEquals(1, bm.countNodes(blockInfo).corruptReplicas()); + + // let a internal block be over replicated with 2 redundant block. + blk.setBlockId(groupId + 2); + cluster.injectBlocks(numDNs - 3, Arrays.asList(blk), bpid); + cluster.injectBlocks(numDNs - 2, Arrays.asList(blk), bpid); + + // update blocksMap + cluster.triggerBlockReports(); + // add to invalidates + cluster.triggerHeartbeats(); + // datanode delete block + cluster.triggerHeartbeats(); + // update blocksMap + cluster.triggerBlockReports(); + + // verify that all internal blocks exists + lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, fileLen); + DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE); + } + + @Test + public void testProcessOverReplicatedAndMissingStripedBlock() + throws Exception { + long fileLen = CELLSIZE * DATA_BLK_NUM; + DFSTestUtil.createStripedFile(cluster, filePath, null, 1, + NUM_STRIPE_PER_BLOCK, false); + LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, fileLen); + LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0)); + long gs = bg.getBlock().getGenerationStamp(); + String bpid = bg.getBlock().getBlockPoolId(); + long groupId = bg.getBlock().getBlockId(); + Block blk = new Block(groupId, BLOCK_SIZE, gs); + // only inject GROUP_SIZE - 1 blocks, so there is one block missing + for (int i = 0; i < GROUP_SIZE - 1; i++) { + blk.setBlockId(groupId + i); + cluster.injectBlocks(i, Arrays.asList(blk), bpid); + } + cluster.triggerBlockReports(); + + // let a internal block be over replicated with 2 redundant blocks. + // Therefor number of internal blocks is over GROUP_SIZE. (5 data blocks + + // 3 parity blocks + 2 redundant blocks > GROUP_SIZE) + blk.setBlockId(groupId + 2); + cluster.injectBlocks(numDNs - 3, Arrays.asList(blk), bpid); + cluster.injectBlocks(numDNs - 2, Arrays.asList(blk), bpid); + + // update blocksMap + cluster.triggerBlockReports(); + // add to invalidates + cluster.triggerHeartbeats(); + // datanode delete block + cluster.triggerHeartbeats(); + // update blocksMap + cluster.triggerBlockReports(); + + // Since one block is missing, when over-replicated blocks got deleted, + // we are left GROUP_SIZE - 1 blocks. + lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, fileLen); + DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1); + } + +}
