Author: edwardyoon
Date: Fri Jan 18 02:07:25 2013
New Revision: 1435008
URL: http://svn.apache.org/viewvc?rev=1435008&view=rev
Log:
HAMA-716: Fix partitioning runner bug
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1435008&r1=1435007&r2=1435008&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Fri Jan 18 02:07:25 2013
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +38,7 @@ import org.apache.hama.util.KeyValuePair
public class PartitioningRunner extends
BSP<Writable, Writable, Writable, Writable, NullWritable> {
+ public static final Log LOG = LogFactory.getLog(PartitioningRunner.class);
private Configuration conf;
private int desiredNum;
@@ -91,10 +94,9 @@ public class PartitioningRunner extends
KeyValuePair<Writable, Writable> inputRecord, Configuration conf);
public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
- @SuppressWarnings("rawtypes")
- Partitioner partitioner, Configuration conf,
- @SuppressWarnings("rawtypes")
- BSPPeer peer, int numTasks);
+ @SuppressWarnings("rawtypes") Partitioner partitioner,
+ Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
+ int numTasks);
}
/**
@@ -111,10 +113,9 @@ public class PartitioningRunner extends
@SuppressWarnings("unchecked")
@Override
public int getPartitionId(KeyValuePair<Writable, Writable> outputRecord,
- @SuppressWarnings("rawtypes")
- Partitioner partitioner, Configuration conf,
- @SuppressWarnings("rawtypes")
- BSPPeer peer, int numTasks) {
+ @SuppressWarnings("rawtypes") Partitioner partitioner,
+ Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
+ int numTasks) {
return Math.abs(partitioner.getPartition(outputRecord.getKey(),
outputRecord.getValue(), numTasks));
}
@@ -179,20 +180,27 @@ public class PartitioningRunner extends
// scan.
FileStatus[] status = fs.listStatus(partitionDir);
for (int j = 0; j < status.length; j++) {
- int idx =
Integer.parseInt(status[j].getPath().getName().split("[-]")[1]);
- int assignedID = idx / (desiredNum / peer.getNumPeers());
+ int partitionID = Integer.parseInt(status[j].getPath().getName()
+ .split("[-]")[1]);
+ int assignedID = partitionID / (desiredNum / peer.getNumPeers());
if (assignedID == peer.getNumPeers())
assignedID = assignedID - 1;
// TODO set replica factor to 1.
// TODO and check whether we can write to specific DataNode.
if (assignedID == peer.getPeerIndex()) {
+ Path partitionFile = new Path(partitionDir + "/"
+ + getPartitionName(partitionID));
+
FileStatus[] files = fs.listStatus(status[j].getPath());
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- new Path(partitionDir + "/" + getPartitionName(j)), outputKeyClass,
- outputValueClass, CompressionType.NONE);
+ partitionFile, outputKeyClass, outputValueClass,
+ CompressionType.NONE);
for (int i = 0; i < files.length; i++) {
+ LOG.debug("merge '" + files[i].getPath() + "' into " + partitionDir
+ + "/" + getPartitionName(partitionID));
+
SequenceFile.Reader reader = new SequenceFile.Reader(fs,
files[i].getPath(), conf);