[
https://issues.apache.org/jira/browse/HDFS-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13763762#comment-13763762
]
Todd Lipcon commented on HDFS-5053:
-----------------------------------
I did my review by loading the patch into my IDE and just adding comments as
java comments. Here's the "diff" of my review:
{code}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java
index 7c0123d..2d31573 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java
@@ -45,6 +45,10 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+// TODO: document -- include the thread safety considerations. It seems like
this
+// class isn't itself thread-safe, so needs to be protected by another lock
(eg the
+// BlockManager lock?)
+// TODO: annotate
public class CacheReplicationManager {
private static final Log LOG =
@@ -87,7 +91,7 @@ public long getPendingUncacheBlocksCount() {
/**
* Blocks to be uncached
*/
- private final InvalidateCacheBlocks uncacheBlocks;
+ private final InvalidateCacheBlocks uncacheBlocks; // rename this to
"blocksToUncache"
/**
* Blocks that need to be cached
*/
@@ -206,6 +210,11 @@ public void processCacheReport(final DatanodeID nodeID,
final String poolId,
"processCacheReport from dead or unregistered node: " + nodeID);
}
+ // TODO: Can we simplify this somewhat by relaxing how quickly we get
the cache
+ // manager up to its full state? eg maybe we never handle cache reports
+ // until we're out of safemode? Maybe that's problematic, but would be
good
+ // to kill some complexity in this class, since it duplicates lots of BM.
+
// To minimize startup time, we discard any second (or later) block
reports
// that we receive while still in startup phase.
if (namesystem.isInStartupSafeMode() && !node.isFirstBlockReport()) {
@@ -242,10 +251,21 @@ public void processCacheReport(final DatanodeID nodeID,
final String poolId,
* @param node Datanode sending a cache report
* @param report cache report
* @throws IOException
+ *
+ * TODO: can we share code somehow a little better here? Even if we have
+ * to extract some interface which both BlockManager and
CacheReplicationManager
+ * implement, eg something with a newBlockReported(),
existingBlockReportedGone(),
+ * etc type methods used as callbacks from the algorithm? Seems a shame to
duplicate
+ * all this stuff.
*/
private void processCacheReport(final DatanodeDescriptor node,
final BlockListAsLongs report) throws IOException {
// TODO: queued processing on the standby
+ // rather than being correct, I think it's better to be simple + sloppy
here, since
+ // the standby-side queueing stuff is pretty messy. How bad is the cache
churn likely
+ // to be if we do it "wrong" here? Or would we get a bunch of
warnings/errors trying
+ // to look up blocks which don't exist? -todd
+
// TODO: proper handling for corrupt replicas
Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
Collection<Block> toRemove = new LinkedList<Block>();
@@ -347,6 +367,12 @@ private BlockInfo processReportedCachedBlock(final
DatanodeDescriptor dn,
if (storedBlock == null) {
// If blocksMap does not contain reported block id,
// the replica should be removed from the data-node.
+ //
+ // TODO: is this actually something that could happen? Do we actually
+ // need to explicitly un-cache it, or can we just assume that it has also
+ // reported this block to the BlockManager, which will tell the DN to
delete
+ // it, and thus get implicitly uncached? At the very least we should
+ // make sure that the tests get code coverage here.
toInvalidate.add(new Block(block));
return null;
}
@@ -363,6 +389,8 @@ private BlockInfo processReportedCachedBlock(final
DatanodeDescriptor dn,
// TODO: queuing cache operations on the standby
return null;
} else {
+ // TODO: Probably worth adding trace logging for these cases -- both
+ // here and in the other return paths below.
toInvalidate.add(block);
return null;
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
index 325a5ff..2195f43 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
@@ -108,6 +108,8 @@ public void run() {
/**
* Assigns under replicated blocks to new datanodes based on priority.
+ * TODO: would rather use distinct terminology like "under-cached" and
"over-cached" here
+ * and elsewhere in this patch, just so readers are very clear.
*/
private void computeCachingWork() {
List<List<Block>> blocksToCache = null;
@@ -132,6 +134,9 @@ private void computeCachingWorkForBlocks(List<List<Block>>
blocksToCache) {
try {
synchronized (neededCacheBlocks) {
for (int priority = 0; priority < blocksToCache.size(); priority++) {
+ // TODO: the priority thing is useless here, right? since we always
iterate
+ // through all of them? Given that, is there any benefit to using
UnderReplicatedBlocks
+ // instead of something simpler like a single LightWeightLinkedSet?
for (Block block : blocksToCache.get(priority)) {
// Required number of cached replicas
requiredRepl = cacheReplManager.getCacheReplication(block);
@@ -139,6 +144,8 @@ private void computeCachingWorkForBlocks(List<List<Block>>
blocksToCache) {
cachedNodes = cacheReplManager.getSafeReplicas(
cacheReplManager.cachedBlocksMap, block);
// Replicas that are safely stored on disk
+ // TODO: you sure the below is safe? you can reach into
blockManager
+ // like this without any locking?
storedNodes = cacheReplManager.getSafeReplicas(
blockManager.blocksMap, block);
// "effective" replication factor which includes pending
@@ -149,7 +156,8 @@ private void computeCachingWorkForBlocks(List<List<Block>>
blocksToCache) {
neededCacheBlocks.remove(block, priority);
blockLog.info("BLOCK* Removing " + block
+ " from neededCacheBlocks as it has enough replicas");
- continue;
+ // TODO: better log message clarity -- better to say "enough
cached replicas" or something
+ continue;
}
// Choose some replicas to cache if needed
additionalRepl = requiredRepl - effectiveRepl;
@@ -229,6 +237,7 @@ private void computeCachingWorkForBlocks(List<List<Block>>
blocksToCache) {
/**
* Reassign replication work that has timed out
*/
+ // TODO: rename to processPendingCaches
private void processPendingReplications() {
Block[] timedOutItems = pendingCacheBlocks.getTimedOutBlocks();
if (timedOutItems != null) {
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java
index 3573b31..d6a3841 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java
@@ -28,17 +28,23 @@
import org.apache.commons.math.random.RandomDataImpl;
import org.apache.hadoop.hdfs.protocol.Block;
+// TODO: Annotate
public class CacheReplicationPolicy {
/**
* Prunes datanodes with insufficient capacity to cache the block.
*
* @return Pruned list of datanodes
+ // TODO: above isn't that clear -- maybe rename this to
"selectSufficientCapacity"
+ // and "return the list of datanodes with sufficient capacity"? I kind of
thought
+ // it was going to return the list of datanodes which _didn't_ fit (i.e
return the
+ // pruned nodes, rather than the remaining nodes post-pruning)
*/
private static List<DatanodeDescriptor> pruneInsuffientCapacity(Block block,
List<DatanodeDescriptor> targets) {
List<DatanodeDescriptor> pruned =
new ArrayList<DatanodeDescriptor>(targets.size());
+ // TODO: just use a foreach loop below?
for (Iterator<DatanodeDescriptor> it = targets.iterator(); it.hasNext();) {
DatanodeDescriptor dn = it.next();
long remaining = dn.getCacheRemaining();
@@ -67,7 +73,7 @@ private static DatanodeDescriptor
randomDatanodeByRemainingCache(Block block,
TreeMap<Long, DatanodeDescriptor> lottery =
new TreeMap<Long, DatanodeDescriptor>();
long totalCacheAvailable = 0;
- for (Iterator<DatanodeDescriptor> it = targets.iterator(); it.hasNext();) {
+ for (Iterator<DatanodeDescriptor> it = targets.iterator(); it.hasNext();)
{ // TODO: use foreach?
DatanodeDescriptor dn = it.next();
long remaining = dn.getCacheRemaining();
totalCacheAvailable += remaining;
@@ -91,7 +97,7 @@ private static DatanodeDescriptor
randomDatanodeByRemainingCache(Block block,
List<DatanodeDescriptor> pruned = pruneInsuffientCapacity(block, targets);
List<DatanodeDescriptor> chosen =
new ArrayList<DatanodeDescriptor>(numTargets);
- for (int i = 0; i < numTargets && pruned.size() > 0; i++) {
+ for (int i = 0; i < numTargets && !pruned.isEmpty(); i++) {
chosen.add(randomDatanodeByRemainingCache(block, pruned));
}
return chosen;
@@ -113,7 +119,7 @@ private static DatanodeDescriptor
randomDatanodeByRemainingCache(Block block,
Collections.shuffle(nodes);
final int additionalTargetsNeeded = effectiveReplication - replication;
int chosen = 0;
- while (chosen < additionalTargetsNeeded && nodes.size() > 0) {
+ while (chosen < additionalTargetsNeeded && !nodes.isEmpty()) {
targets.add(nodes.get(chosen));
chosen++;
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 26a6ce9..c7a9f2a 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -156,6 +156,7 @@ synchronized void clear() {
/** A queue of blocks to be cached by this datanode */
private BlockQueue<Block> cacheBlocks = new BlockQueue<Block>();
/** A set of blocks to be uncached by this datanode */
+ // TODO: rename to blocksToUncache
private LightWeightHashSet<Block> uncacheBlocks =
new LightWeightHashSet<Block>();
@@ -290,6 +291,9 @@ int moveBlockToHead(BlockInfo b, int curIndex, int
headIndex) {
* @return true if block was successfully added, false if already present
*/
public boolean addCachedBlock(BlockInfo b) {
+ // TODO: any way to add an assertion that this BlockInfo is in fact
+ // a cloned copy? ie that we don't accidentally end up with one BlockInfo
+ // that is on both the cached and uncached lists?
if (!b.addNode(this))
return false;
// add to the head of the data-node list
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateCacheBlocks.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateCacheBlocks.java
index e7cbee9..1ae963f 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateCacheBlocks.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateCacheBlocks.java
@@ -34,6 +34,13 @@
synchronized List<Block> invalidateWork(
final String storageId, final DatanodeDescriptor dn) {
final List<Block> toInvalidate = invalidateWorkInternal(storageId);
+// This is a little strange, since invalidateWorkInternal will use
+// datanodeManager.blockInvalidateLimit, which isn't really related to caching.
+// Would it be possible to split the data structure portion out of
+// InvalidateBlocks, and give it a method like pollN(storageID, limit)? Then
+// InvalidateCacheBlocks and InvalidateBlocks wouldn't have to have the
inheritance
+// relationship (and would also help with things like the now-inappropriate
dump() method
+// in the superclass which claims these blocks are ready to be deleted).
if (toInvalidate != null) {
dn.addBlocksToBeUncached(toInvalidate);
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
index 6b07b78..01c31f2 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
@@ -35,6 +35,8 @@
/***************************************************
* PendingReplicationBlocks does the bookkeeping of all
* blocks that are getting replicated.
+ * TODO: add a note here that this is used both for replication of
+ * on-disk replicas as well as for pending requests to cache blocks)
*
* It does the following:
* 1) record blocks that are getting replicated at this instant.
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
index 555cec4..b210a6e 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
@@ -61,6 +61,7 @@
* The policy here is to keep those corrupt blocks replicated, but give
* blocks that are not corrupt higher priority.</li>
* </ol>
+ * TODO: add note that this is used for "under-cached" blocks as well.
*/
class UnderReplicatedBlocks implements Iterable<Block> {
/** The total number of queues : {@value} */
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 15a7d2c..68d28d0 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -105,6 +105,14 @@ static long combinePreferredBlockSize(long header, long
blockSize) {
private BlockInfo[] blocks;
private short cacheReplication = 0;
+ // TODO: Are we cool sparing the extra 2 bytes on every INode? I haven't see
this
+ // explicitly discussed anywhere, but admittedly I haven't followed as
closely
+ // as I should have. Given that we expect cache capacity to be ~2 orders of
magnitude
+ // smaller than disk capacity, it seems like it's a bit wasteful to reserve
these bits
+ // in every file when 99% of files will set it to 0. I recall seeing another
JIRA
+ // that discussed adding attributes to INodes without the cost of adding
+ // a field to every INode object. Another alternative is to steal some bits
from the
+ // existing 64-bit header -- 48 bits is a lot for block size, as is 16 bit
for replication.
INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime,
long atime,
BlockInfo[] blklist, short replication, long preferredBlockSize) {
{code}
> NameNode should invoke DataNode APIs to coordinate caching
> ----------------------------------------------------------
>
> Key: HDFS-5053
> URL: https://issues.apache.org/jira/browse/HDFS-5053
> Project: Hadoop HDFS
> Issue Type: Sub-task
> Components: datanode, namenode
> Reporter: Colin Patrick McCabe
> Assignee: Andrew Wang
> Attachments: hdfs-5053-1.patch
>
>
> The NameNode should invoke the DataNode APIs to coordinate caching.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira