Repository: hadoop Updated Branches: refs/heads/branch-2 c4747158b -> cd1e0930d
HDFS-4366. Block Replication Policy Implementation May Skip Higher-Priority Blocks for Lower-Priority Blocks. (Derek Dagit and Zhe Zhang via wang) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cd1e0930 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cd1e0930 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cd1e0930 Branch: refs/heads/branch-2 Commit: cd1e0930dccf248b98dea21fbe12be23f03619fe Parents: c474715 Author: Andrew Wang <[email protected]> Authored: Mon Jun 22 14:18:24 2015 -0700 Committer: Andrew Wang <[email protected]> Committed: Mon Jun 22 14:18:33 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/blockmanagement/BlockManager.java | 5 - .../blockmanagement/UnderReplicatedBlocks.java | 66 +++---- .../hadoop/hdfs/util/LightWeightLinkedSet.java | 34 ++++ .../blockmanagement/TestReplicationPolicy.java | 195 +++++++++++++++++++ .../hdfs/util/TestLightWeightLinkedSet.java | 69 +++++++ 6 files changed, 331 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd1e0930/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6e7030e..5f2fcf2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -597,6 +597,9 @@ Release 2.8.0 - UNRELEASED HDFS-8337. Accessing httpfs via webhdfs doesn't work from a jar with kerberos. (Yongjun Zhang) + HDFS-4366. Block Replication Policy Implementation May Skip Higher-Priority + Blocks for Lower-Priority Blocks (Derek Dagit via kihwal) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd1e0930/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 7c2a0bc..53a6868 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1347,7 +1347,6 @@ public class BlockManager { // abandoned block or block reopened for append if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { neededReplications.remove(block, priority); // remove from neededReplications - neededReplications.decrementReplicationIndex(priority); continue; } @@ -1377,7 +1376,6 @@ public class BlockManager { if ( (pendingReplications.getNumReplicas(block) > 0) || (blockHasEnoughRacks(block)) ) { neededReplications.remove(block, priority); // remove from neededReplications - neededReplications.decrementReplicationIndex(priority); blockLog.info("BLOCK* Removing {} from neededReplications as" + " it has enough replicas", block); continue; @@ -1434,7 +1432,6 @@ public class BlockManager { if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { neededReplications.remove(block, priority); // remove from neededReplications rw.targets = null; - neededReplications.decrementReplicationIndex(priority); continue; } requiredReplication = bc.getPreferredBlockReplication(); @@ -1448,7 +1445,6 @@ public class BlockManager { if ( (pendingReplications.getNumReplicas(block) > 0) || (blockHasEnoughRacks(block)) ) { neededReplications.remove(block, priority); // remove from neededReplications - neededReplications.decrementReplicationIndex(priority); rw.targets = null; blockLog.info("BLOCK* Removing {} from neededReplications as" + " it has enough replicas", block); @@ -1481,7 +1477,6 @@ public class BlockManager { // remove from neededReplications if(numEffectiveReplicas + targets.length >= requiredReplication) { neededReplications.remove(block, priority); // remove from neededReplications - neededReplications.decrementReplicationIndex(priority); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd1e0930/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java index 0240805..49bc1fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java @@ -18,12 +18,9 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; - import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -82,11 +79,9 @@ class UnderReplicatedBlocks implements Iterable<Block> { /** The queue for corrupt blocks: {@value} */ static final int QUEUE_WITH_CORRUPT_BLOCKS = 4; /** the queues themselves */ - private final List<LightWeightLinkedSet<Block>> priorityQueues - = new ArrayList<LightWeightLinkedSet<Block>>(); + private List<LightWeightLinkedSet<Block>> priorityQueues + = new ArrayList<LightWeightLinkedSet<Block>>(LEVEL); - /** Stores the replication index for each priority */ - private Map<Integer, Integer> priorityToReplIdx = new HashMap<Integer, Integer>(LEVEL); /** The number of corrupt blocks with replication factor 1 */ private int corruptReplOneBlocks = 0; @@ -94,7 +89,6 @@ class UnderReplicatedBlocks implements Iterable<Block> { UnderReplicatedBlocks() { for (int i = 0; i < LEVEL; i++) { priorityQueues.add(new LightWeightLinkedSet<Block>()); - priorityToReplIdx.put(i, 0); } } @@ -328,16 +322,18 @@ class UnderReplicatedBlocks implements Iterable<Block> { } } } - + /** * Get a list of block lists to be replicated. The index of block lists - * represents its replication priority. Replication index will be tracked for - * each priority list separately in priorityToReplIdx map. Iterates through - * all priority lists and find the elements after replication index. Once the - * last priority lists reaches to end, all replication indexes will be set to - * 0 and start from 1st priority list to fulfill the blockToProces count. - * - * @param blocksToProcess - number of blocks to fetch from underReplicated blocks. + * represents its replication priority. Iterates each block list in priority + * order beginning with the highest priority list. Iterators use a bookmark to + * resume where the previous iteration stopped. Returns when the block count + * is met or iteration reaches the end of the lowest priority list, in which + * case bookmarks for each block list are reset to the heads of their + * respective lists. + * + * @param blocksToProcess - number of blocks to fetch from underReplicated + * blocks. * @return Return a list of block lists to be replicated. The block list index * represents its replication priority. */ @@ -357,12 +353,8 @@ class UnderReplicatedBlocks implements Iterable<Block> { for (int priority = 0; priority < LEVEL; priority++) { // Go through all blocks that need replications with current priority. BlockIterator neededReplicationsIterator = iterator(priority); - Integer replIndex = priorityToReplIdx.get(priority); - - // skip to the first unprocessed block, which is at replIndex - for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) { - neededReplicationsIterator.next(); - } + // Set the iterator to the first unprocessed block at this priority level. + neededReplicationsIterator.setToBookmark(); blocksToProcess = Math.min(blocksToProcess, size()); @@ -375,20 +367,18 @@ class UnderReplicatedBlocks implements Iterable<Block> { && neededReplicationsIterator.hasNext()) { Block block = neededReplicationsIterator.next(); blocksToReplicate.get(priority).add(block); - replIndex++; blockCount++; } if (!neededReplicationsIterator.hasNext() && neededReplicationsIterator.getPriority() == LEVEL - 1) { - // reset all priorities replication index to 0 because there is no - // recently added blocks in any list. + // Reset all priorities' bookmarks to the beginning because there were + // no recently added blocks in any list. for (int i = 0; i < LEVEL; i++) { - priorityToReplIdx.put(i, 0); + this.priorityQueues.get(i).resetBookmark(); } break; } - priorityToReplIdx.put(priority, replIndex); } return blocksToReplicate; } @@ -471,15 +461,19 @@ class UnderReplicatedBlocks implements Iterable<Block> { int getPriority() { return level; } - } - /** - * This method is to decrement the replication index for the given priority - * - * @param priority - int priority level - */ - public void decrementReplicationIndex(int priority) { - Integer replIdx = priorityToReplIdx.get(priority); - priorityToReplIdx.put(priority, --replIdx); + /** + * Sets iterator(s) to bookmarked elements. + */ + private synchronized void setToBookmark() { + if (this.isIteratorForLevel) { + this.iterators.set(0, priorityQueues.get(this.level) + .getBookmark()); + } else { + for (int i = 0; i < LEVEL; i++) { + this.iterators.set(i, priorityQueues.get(i).getBookmark()); + } + } + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd1e0930/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java index f470cdd..dbd615c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java @@ -56,6 +56,8 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T> { private DoubleLinkedElement<T> head; private DoubleLinkedElement<T> tail; + private LinkedSetIterator bookmark; + /** * @param initCapacity * Recommended size of the internal array. @@ -69,6 +71,7 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T> { super(initCapacity, maxLoadFactor, minLoadFactor); head = null; tail = null; + bookmark = new LinkedSetIterator(); } public LightWeightLinkedSet() { @@ -111,6 +114,12 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T> { tail = le; if (head == null) { head = le; + bookmark.next = head; + } + + // Update bookmark, if necessary. + if (bookmark.next == null) { + bookmark.next = le; } return true; } @@ -141,6 +150,11 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T> { if (tail == found) { tail = tail.before; } + + // Update bookmark, if necessary. + if (found == this.bookmark.next) { + this.bookmark.next = found.after; + } return found; } @@ -262,5 +276,25 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T> { super.clear(); this.head = null; this.tail = null; + this.resetBookmark(); + } + + /** + * Returns a new iterator starting at the bookmarked element. + * + * @return the iterator to the bookmarked element. + */ + public Iterator<T> getBookmark() { + LinkedSetIterator toRet = new LinkedSetIterator(); + toRet.next = this.bookmark.next; + this.bookmark = toRet; + return toRet; + } + + /** + * Resets the bookmark to the beginning of the list. + */ + public void resetBookmark() { + this.bookmark.next = this.head; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd1e0930/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index cc2b732..0557655 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -38,6 +39,7 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -50,6 +52,7 @@ import org.apache.hadoop.hdfs.TestBlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -1100,4 +1103,196 @@ public class TestReplicationPolicy { exception.expect(IllegalArgumentException.class); blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf); } + + @Test(timeout = 60000) + public void testUpdateDoesNotCauseSkippedReplication() { + UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks(); + + Block block1 = new Block(ThreadLocalRandom.current().nextLong()); + Block block2 = new Block(ThreadLocalRandom.current().nextLong()); + Block block3 = new Block(ThreadLocalRandom.current().nextLong()); + + // Adding QUEUE_VERY_UNDER_REPLICATED block + final int block1CurReplicas = 2; + final int block1ExpectedReplicas = 7; + underReplicatedBlocks.add(block1, block1CurReplicas, 0, + block1ExpectedReplicas); + + // Adding QUEUE_VERY_UNDER_REPLICATED block + underReplicatedBlocks.add(block2, 2, 0, 7); + + // Adding QUEUE_UNDER_REPLICATED block + underReplicatedBlocks.add(block3, 2, 0, 6); + + List<List<Block>> chosenBlocks; + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_VERY_UNDER_REPLICATED. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 0, 1, 0, 0, 0); + + // Increasing the replications will move the block down a + // priority. This simulates a replica being completed in between checks. + underReplicatedBlocks.update(block1, block1CurReplicas+1, 0, + block1ExpectedReplicas, 1, 0); + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_VERY_UNDER_REPLICATED. + // This block was moved up a priority and should not be skipped over. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 0, 1, 0, 0, 0); + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_UNDER_REPLICATED. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 0, 0, 1, 0, 0); + } + + @Test(timeout = 60000) + public void testAddStoredBlockDoesNotCauseSkippedReplication() + throws IOException { + Namesystem mockNS = mock(Namesystem.class); + when(mockNS.isPopulatingReplQueues()).thenReturn(true); + when(mockNS.hasWriteLock()).thenReturn(true); + BlockManager bm = + new BlockManager(mockNS, new HdfsConfiguration()); + UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; + + Block block1 = new Block(ThreadLocalRandom.current().nextLong()); + Block block2 = new Block(ThreadLocalRandom.current().nextLong()); + + // Adding QUEUE_UNDER_REPLICATED block + underReplicatedBlocks.add(block1, 0, 1, 1); + + // Adding QUEUE_UNDER_REPLICATED block + underReplicatedBlocks.add(block2, 0, 1, 1); + + List<List<Block>> chosenBlocks; + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_VERY_UNDER_REPLICATED. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); + + // Adding this block collection to the BlockManager, so that when we add the + // block under construction, the BlockManager will realize the expected + // replication has been achieved and remove it from the under-replicated + // queue. + BlockInfoUnderConstruction info = new BlockInfoUnderConstructionContiguous(block1, (short)1); + BlockCollection bc = mock(BlockCollection.class); + when(bc.getPreferredBlockReplication()).thenReturn((short)1); + bm.addBlockCollection(info, bc); + + StatefulBlockInfo statefulBlockInfo = new StatefulBlockInfo(info, + block1, ReplicaState.RBW); + + // Adding this block will increase its current replication, and that will + // remove it from the queue. + bm.addStoredBlockUnderConstruction(statefulBlockInfo, + TestReplicationPolicy.storages[0]); + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_VERY_UNDER_REPLICATED. + // This block remains and should not be skipped over. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); + } + + @Test(timeout = 60000) + public void + testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication() + throws IOException { + Namesystem mockNS = mock(Namesystem.class); + when(mockNS.isPopulatingReplQueues()).thenReturn(true); + BlockManager bm = + new BlockManager(mockNS, new HdfsConfiguration()); + UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; + + Block block1 = new Block(ThreadLocalRandom.current().nextLong()); + Block block2 = new Block(ThreadLocalRandom.current().nextLong()); + + // Adding QUEUE_UNDER_REPLICATED block + underReplicatedBlocks.add(block1, 0, 1, 1); + + // Adding QUEUE_UNDER_REPLICATED block + underReplicatedBlocks.add(block2, 0, 1, 1); + + List<List<Block>> chosenBlocks; + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_VERY_UNDER_REPLICATED. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); + + final BlockInfo info = new BlockInfoContiguous(block1, (short) 1); + final BlockCollection mbc = mock(BlockCollection.class); + when(mbc.getLastBlock()).thenReturn(info); + when(mbc.getPreferredBlockSize()).thenReturn(block1.getNumBytes() + 1); + when(mbc.getPreferredBlockReplication()).thenReturn((short)1); + ContentSummary cs = mock(ContentSummary.class); + when(cs.getLength()).thenReturn((long)1); + when(mbc.computeContentSummary(bm.getStoragePolicySuite())).thenReturn(cs); + info.setBlockCollection(mbc); + bm.addBlockCollection(info, mbc); + + DatanodeStorageInfo[] dnAry = {storages[0]}; + final BlockInfoUnderConstruction ucBlock = + info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION, + dnAry); + + DatanodeStorageInfo storage = mock(DatanodeStorageInfo.class); + DatanodeDescriptor dn = mock(DatanodeDescriptor.class); + when(dn.isDecommissioned()).thenReturn(true); + when(storage.getState()).thenReturn(DatanodeStorage.State.NORMAL); + when(storage.getDatanodeDescriptor()).thenReturn(dn); + when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true); + when(storage.addBlock(any(BlockInfo.class))).thenReturn + (DatanodeStorageInfo.AddBlockResult.ADDED); + ucBlock.addStorage(storage); + + when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any())) + .thenReturn(ucBlock); + + bm.convertLastBlockToUnderConstruction(mbc, 0); + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_VERY_UNDER_REPLICATED. + // This block remains and should not be skipped over. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); + } + + @Test(timeout = 60000) + public void testupdateNeededReplicationsDoesNotCauseSkippedReplication() + throws IOException { + Namesystem mockNS = mock(Namesystem.class); + when(mockNS.isPopulatingReplQueues()).thenReturn(true); + BlockManager bm = + new BlockManager(mockNS, new HdfsConfiguration()); + UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; + + Block block1 = new Block(ThreadLocalRandom.current().nextLong()); + Block block2 = new Block(ThreadLocalRandom.current().nextLong()); + + // Adding QUEUE_UNDER_REPLICATED block + underReplicatedBlocks.add(block1, 0, 1, 1); + + // Adding QUEUE_UNDER_REPLICATED block + underReplicatedBlocks.add(block2, 0, 1, 1); + + List<List<Block>> chosenBlocks; + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_VERY_UNDER_REPLICATED. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); + + bm.setReplication((short)0, (short)1, "", block1); + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_VERY_UNDER_REPLICATED. + // This block remains and should not be skipped over. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd1e0930/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java index e8b365a..f923920 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java @@ -325,10 +325,19 @@ public class TestLightWeightLinkedSet { assertEquals(NUM, set.size()); assertFalse(set.isEmpty()); + // Advance the bookmark. + Iterator<Integer> bkmrkIt = set.getBookmark(); + for (int i=0; i<set.size()/2+1; i++) { + bkmrkIt.next(); + } + assertTrue(bkmrkIt.hasNext()); + // clear the set set.clear(); assertEquals(0, set.size()); assertTrue(set.isEmpty()); + bkmrkIt = set.getBookmark(); + assertFalse(bkmrkIt.hasNext()); // poll should return an empty list assertEquals(0, set.pollAll().size()); @@ -363,4 +372,64 @@ public class TestLightWeightLinkedSet { LOG.info("Test capacity - DONE"); } + @Test(timeout=60000) + public void testGetBookmarkReturnsBookmarkIterator() { + LOG.info("Test getBookmark returns proper iterator"); + assertTrue(set.addAll(list)); + + Iterator<Integer> bookmark = set.getBookmark(); + assertEquals(bookmark.next(), list.get(0)); + + final int numAdvance = list.size()/2; + for(int i=1; i<numAdvance; i++) { + bookmark.next(); + } + + Iterator<Integer> bookmark2 = set.getBookmark(); + assertEquals(bookmark2.next(), list.get(numAdvance)); + } + + @Test(timeout=60000) + public void testBookmarkAdvancesOnRemoveOfSameElement() { + LOG.info("Test that the bookmark advances if we remove its element."); + assertTrue(set.add(list.get(0))); + assertTrue(set.add(list.get(1))); + assertTrue(set.add(list.get(2))); + + Iterator<Integer> it = set.getBookmark(); + assertEquals(it.next(), list.get(0)); + set.remove(list.get(1)); + it = set.getBookmark(); + assertEquals(it.next(), list.get(2)); + } + + @Test(timeout=60000) + public void testBookmarkSetToHeadOnAddToEmpty() { + LOG.info("Test bookmark is set after adding to previously empty set."); + Iterator<Integer> it = set.getBookmark(); + assertFalse(it.hasNext()); + set.add(list.get(0)); + set.add(list.get(1)); + + it = set.getBookmark(); + assertTrue(it.hasNext()); + assertEquals(it.next(), list.get(0)); + assertEquals(it.next(), list.get(1)); + assertFalse(it.hasNext()); + } + + @Test(timeout=60000) + public void testResetBookmarkPlacesBookmarkAtHead() { + set.addAll(list); + Iterator<Integer> it = set.getBookmark(); + final int numAdvance = set.size()/2; + for (int i=0; i<numAdvance; i++) { + it.next(); + } + assertEquals(it.next(), list.get(numAdvance)); + + set.resetBookmark(); + it = set.getBookmark(); + assertEquals(it.next(), list.get(0)); + } } \ No newline at end of file
