Author: edwardyoon
Date: Fri Feb 28 06:35:46 2014
New Revision: 1572838
URL: http://svn.apache.org/r1572838
Log:
HAMA-878: PartitioningRunner.Sorter doesn't allows duplicate key
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1572838&r1=1572837&r2=1572838&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Fri Feb 28 06:35:46 2014
@@ -21,8 +21,6 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -92,10 +90,11 @@ public class PartitioningRunner extends
* @param conf Configuration of the job.
* @return the Key-Value pair instance of the expected sequential format.
* Should return null if the conversion was not successful.
- * @throws IOException
+ * @throws IOException
*/
public KeyValuePair<Writable, Writable> convertRecord(
- KeyValuePair<Writable, Writable> inputRecord, Configuration conf)
throws IOException;
+ KeyValuePair<Writable, Writable> inputRecord, Configuration conf)
+ throws IOException;
public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
@SuppressWarnings("rawtypes")
@@ -160,7 +159,7 @@ public class PartitioningRunner extends
if (convertedRecord == null) {
throw new IOException("The converted record can't be null.");
}
-
+
Writable convertedKey = convertedRecord.getKey();
convertedKeyClass = convertedKey.getClass();
@@ -213,8 +212,12 @@ public class PartitioningRunner extends
}
}
+ // public SortedMap<WritableComparable, KeyValuePair<Integer, KeyValuePair>>
+ // comparisonMap = new TreeMap<WritableComparable, KeyValuePair<Integer,
+ // KeyValuePair>>();
+
@SuppressWarnings("rawtypes")
- public SortedMap<WritableComparable, KeyValuePair<Integer, KeyValuePair>>
comparisonMap = new TreeMap<WritableComparable, KeyValuePair<Integer,
KeyValuePair>>();
+ public Map<Integer, KeyValuePair<WritableComparable, MapWritable>>
candidates = new HashMap<Integer, KeyValuePair<WritableComparable,
MapWritable>>();
@SuppressWarnings({ "rawtypes", "unchecked" })
private void mergeSortedFiles(FileStatus[] status, Path destinationFilePath,
@@ -246,7 +249,9 @@ public class PartitioningRunner extends
value = new MapWritable();
readers.get(i).next(convertedKey, value);
- comparisonMap.put(convertedKey, new KeyValuePair(i, value));
+ // comparisonMap.put(convertedKey, new KeyValuePair(i, value));
+ // Integer, KeyValuePair<WritableComparable, MapWritable>
+ candidates.put(i, new KeyValuePair(convertedKey, value));
}
while (readers.size() > 0) {
@@ -254,27 +259,41 @@ public class PartitioningRunner extends
convertedKeyClass, conf);
value = new MapWritable();
- WritableComparable firstKey = comparisonMap.firstKey();
- KeyValuePair kv = comparisonMap.get(firstKey);
-
- int readerIndex = (Integer) kv.getKey();
- MapWritable rawRecord = (MapWritable) kv.getValue();
+ int readerIndex = 0;
+ WritableComparable firstKey = null;
+ MapWritable rawRecord = null;
+
+ for (Map.Entry<Integer, KeyValuePair<WritableComparable, MapWritable>>
keys : candidates
+ .entrySet()) {
+ if (firstKey == null) {
+ readerIndex = keys.getKey();
+ firstKey = keys.getValue().getKey();
+ rawRecord = (MapWritable) keys.getValue().getValue();
+ } else {
+ WritableComparable currentKey = keys.getValue().getKey();
+ if (firstKey.compareTo(currentKey) > 0) {
+ readerIndex = keys.getKey();
+ firstKey = currentKey;
+ rawRecord = (MapWritable) keys.getValue().getValue();
+ }
+ }
+ }
for (Map.Entry<Writable, Writable> e : rawRecord.entrySet()) {
writer.append(e.getKey(), e.getValue());
}
- comparisonMap.remove(firstKey);
+ candidates.remove(readerIndex);
if (readers.get(readerIndex).next(convertedKey, value)) {
- comparisonMap.put(convertedKey, new KeyValuePair(readerIndex, value));
+ candidates.put(readerIndex, new KeyValuePair(convertedKey, value));
} else {
readers.get(readerIndex).close();
readers.remove(readerIndex);
}
}
- comparisonMap.clear();
+ candidates.clear();
writer.close();
}