Hello everyone, I am going thru source code of MapReduce. In MergeQueue.merge, I can only see the SEGMENTS are combined and sorted by length into a list for merge. However, I could not find the procedure to sort those (key, value) in segments by key...
here is the function: 1. RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass, 2. . 3. . 4. . 5. 6. //if we have lesser number of segments remaining, then just return the 7. //iterator, else do another single level merge 8. if (numSegments <= factor) { 9. // Reset totalBytesProcessed to track the progress of the final merge. 10. // This is considered the progress of the reducePhase, the 3rd phase 11. // of reduce task. Currently totalBytesProcessed is not used in sort 12. // phase of reduce task(i.e. when intermediate merges happen). 13. totalBytesProcessed = startBytes; 14. 15. //calculate the length of the remaining segments. Required for 16. //calculating the merge progress 17. long totalBytes = 0; 18. for (int i = 0; i < segmentsToMerge.size(); i++) { 19. totalBytes += segmentsToMerge.get(i).getLength(); 20. } 21. if (totalBytes != 0) //being paranoid 22. progPerByte = 1.0f / (float)totalBytes; 23. 24. if (totalBytes != 0) 25. mergeProgress.set(totalBytesProcessed * progPerByte); 26. else 27. mergeProgress.set(1.0f); // Last pass and no segments left - we're done 28. 29. LOG.info("Down to the last merge-pass, with " + numSegments + 30. " segments left of total size: " + totalBytes + " bytes"); 31. return this; 32. } else { 33. LOG.info("Merging " + segmentsToMerge.size() + 34. " intermediate segments out of a total of " + 35. (segments.size()+segmentsToMerge.size())); 36. 37. //we want to spread the creation of temp files on multiple disks if 38. //available under the space constraints 39. long approxOutputSize = 0; 40. for (Segment<K, V> s : segmentsToMerge) { 41. approxOutputSize += s.getLength() + 42. ChecksumFileSystem.getApproxChkSumLength( 43. s.getLength()); 44. } 45. Path tmpFilename = 46. new Path(tmpDir, "intermediate").suffix("." + passNo); 47. 48. Path outputFile = lDirAlloc.getLocalPathForWrite( 49. tmpFilename.toString(), 50. approxOutputSize, conf); 51. 52. Writer<K, V> writer = 53. new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec, 54. writesCounter); 55. *writeFile(this, writer, reporter, conf);* 56. writer.close(); 57. 58. //we finished one single level merge; now clean up the priority 59. //queue 60. this.close(); 61. 62. // Add the newly create segment to the list of segments to be merged 63. Segment<K, V> tempSegment = 64. new Segment<K, V>(conf, fs, outputFile, codec, false); 65. segments.add(tempSegment); 66. numSegments = segments.size(); 67. Collections.sort(segments, segmentComparator); 68. 69. passNo++; 70. } 71. //we are worried about only the first pass merge factor. So reset the 72. //factor to what it originally was 73. factor = origFactor; 74. } while(true); 75. } I can see if number of segments is less than factor, segments are returned(is this right?). Otherwise, factor number of segments will be merged pass by pass. But how those <K,V> in different segments get sort in order ? Elton