Author: edwardyoon
Date: Sun Jan 6 21:35:25 2013
New Revision: 1429600
URL: http://svn.apache.org/viewvc?rev=1429600&view=rev
Log:
Add comments
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1429600&r1=1429599&r2=1429600&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Sun Jan 6 21:35:25 2013
@@ -86,6 +86,7 @@ public class PartitioningRunner extends
values.get(index).put(pair.getKey(), pair.getValue());
}
+ // The reason of use of Memory is to reduce file opens
for (Map.Entry<Integer, Map<Writable, Writable>> e : values.entrySet()) {
Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
+ peer.getPeerIndex());
@@ -100,13 +101,16 @@ public class PartitioningRunner extends
peer.sync();
// merge files into one.
+ // TODO if we use header info, we might able to merge files without full
scan.
FileStatus[] status = fs.listStatus(partitionDir);
for (int j = 0; j < status.length; j++) {
int idx =
Integer.parseInt(status[j].getPath().getName().split("[-]")[1]);
int assignedID = idx / (desiredNum / peer.getNumPeers());
if (assignedID == peer.getNumPeers())
assignedID = assignedID - 1;
-
+
+ // TODO set replica factor to 1.
+ // TODO and check whether we can write to specific DataNode.
if (assignedID == peer.getPeerIndex()) {
FileStatus[] files = fs.listStatus(status[j].getPath());
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,