Author: edwardyoon
Date: Fri Feb 28 06:35:46 2014
New Revision: 1572838

URL: http://svn.apache.org/r1572838
Log:
HAMA-878: PartitioningRunner.Sorter doesn't allows duplicate key

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java

Modified: 
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: 
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1572838&r1=1572837&r2=1572838&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java 
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java 
Fri Feb 28 06:35:46 2014
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -92,10 +90,11 @@ public class PartitioningRunner extends
      * @param conf Configuration of the job.
      * @return the Key-Value pair instance of the expected sequential format.
      *         Should return null if the conversion was not successful.
-     * @throws IOException 
+     * @throws IOException
      */
     public KeyValuePair<Writable, Writable> convertRecord(
-        KeyValuePair<Writable, Writable> inputRecord, Configuration conf) 
throws IOException;
+        KeyValuePair<Writable, Writable> inputRecord, Configuration conf)
+        throws IOException;
 
     public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
         @SuppressWarnings("rawtypes")
@@ -160,7 +159,7 @@ public class PartitioningRunner extends
       if (convertedRecord == null) {
         throw new IOException("The converted record can't be null.");
       }
-      
+
       Writable convertedKey = convertedRecord.getKey();
       convertedKeyClass = convertedKey.getClass();
 
@@ -213,8 +212,12 @@ public class PartitioningRunner extends
     }
   }
 
+  // public SortedMap<WritableComparable, KeyValuePair<Integer, KeyValuePair>>
+  // comparisonMap = new TreeMap<WritableComparable, KeyValuePair<Integer,
+  // KeyValuePair>>();
+
   @SuppressWarnings("rawtypes")
-  public SortedMap<WritableComparable, KeyValuePair<Integer, KeyValuePair>> 
comparisonMap = new TreeMap<WritableComparable, KeyValuePair<Integer, 
KeyValuePair>>();
+  public Map<Integer, KeyValuePair<WritableComparable, MapWritable>> 
candidates = new HashMap<Integer, KeyValuePair<WritableComparable, 
MapWritable>>();
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
   private void mergeSortedFiles(FileStatus[] status, Path destinationFilePath,
@@ -246,7 +249,9 @@ public class PartitioningRunner extends
       value = new MapWritable();
 
       readers.get(i).next(convertedKey, value);
-      comparisonMap.put(convertedKey, new KeyValuePair(i, value));
+      // comparisonMap.put(convertedKey, new KeyValuePair(i, value));
+      // Integer, KeyValuePair<WritableComparable, MapWritable>
+      candidates.put(i, new KeyValuePair(convertedKey, value));
     }
 
     while (readers.size() > 0) {
@@ -254,27 +259,41 @@ public class PartitioningRunner extends
           convertedKeyClass, conf);
       value = new MapWritable();
 
-      WritableComparable firstKey = comparisonMap.firstKey();
-      KeyValuePair kv = comparisonMap.get(firstKey);
-
-      int readerIndex = (Integer) kv.getKey();
-      MapWritable rawRecord = (MapWritable) kv.getValue();
+      int readerIndex = 0;
+      WritableComparable firstKey = null;
+      MapWritable rawRecord = null;
+
+      for (Map.Entry<Integer, KeyValuePair<WritableComparable, MapWritable>> 
keys : candidates
+          .entrySet()) {
+        if (firstKey == null) {
+          readerIndex = keys.getKey();
+          firstKey = keys.getValue().getKey();
+          rawRecord = (MapWritable) keys.getValue().getValue();
+        } else {
+          WritableComparable currentKey = keys.getValue().getKey();
+          if (firstKey.compareTo(currentKey) > 0) {
+            readerIndex = keys.getKey();
+            firstKey = currentKey;
+            rawRecord = (MapWritable) keys.getValue().getValue();
+          }
+        }
+      }
 
       for (Map.Entry<Writable, Writable> e : rawRecord.entrySet()) {
         writer.append(e.getKey(), e.getValue());
       }
 
-      comparisonMap.remove(firstKey);
+      candidates.remove(readerIndex);
 
       if (readers.get(readerIndex).next(convertedKey, value)) {
-        comparisonMap.put(convertedKey, new KeyValuePair(readerIndex, value));
+        candidates.put(readerIndex, new KeyValuePair(convertedKey, value));
       } else {
         readers.get(readerIndex).close();
         readers.remove(readerIndex);
       }
     }
 
-    comparisonMap.clear();
+    candidates.clear();
     writer.close();
   }
 


Reply via email to