[ 
https://issues.apache.org/jira/browse/TAJO-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14100305#comment-14100305
 ] 

ASF GitHub Bot commented on TAJO-992:
-------------------------------------

Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/115#discussion_r16338083
  
    --- Diff: 
tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java 
---
    @@ -923,20 +960,62 @@ public static void 
scheduleScatteredHashShuffleFetches(TaskSchedulerContext sche
     
         LOG.info(subQuery.getId()
             + ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name()
    -        + ", DeterminedTaskNum : " + fetches.size());
    +        + ", Intermediate Size: " + totalIntermediateSize
    +        + ", splitSize: " + splitVolume
    +        + ", DeterminedTaskNum: " + fetches.size());
       }
     
    -  static class IntermediateEntryComparator implements 
Comparator<IntermediateEntry> {
    +  /**
    +   * If a IntermediateEntry is large than splitVolume, List<FetchImpl> has 
single element.
    +   * @param ebId
    +   * @param entries
    +   * @param splitVolume
    +   * @return
    +   */
    +  public static List<List<FetchImpl>> splitOrMergeIntermediates(
    +      ExecutionBlockId ebId, List<IntermediateEntry> entries, long 
splitVolume, long pageSize) {
    +    // Each List<FetchImpl> has splitVolume size.
    +    List<List<FetchImpl>> fetches = new ArrayList<List<FetchImpl>>();
    +
    +    Iterator<IntermediateEntry> iter = entries.iterator();
    +    if (!iter.hasNext()) {
    +      return null;
    +    }
    +    List<FetchImpl> fetchListForSingleTask = new ArrayList<FetchImpl>();
    +    long fetchListVolume = 0;
     
    -    @Override
    -    public int compare(IntermediateEntry o1, IntermediateEntry o2) {
    -      int cmp = Ints.compare(o1.getPartId(), o2.getPartId());
    -      if (cmp != 0) {
    -        return cmp;
    +    while (iter.hasNext()) {
    +      IntermediateEntry currentInterm = iter.next();
    +
    +      long firstSplitVolume = splitVolume - fetchListVolume;
    +      if (firstSplitVolume < pageSize) {
    +        firstSplitVolume = splitVolume;
    +      }
    +      List<Pair<Long, Long>> splits = 
currentInterm.split(firstSplitVolume, splitVolume);
    --- End diff --
    
    Could you add some comments what each long of a pair means?


> Reduce number of hash shuffle output file.
> ------------------------------------------
>
>                 Key: TAJO-992
>                 URL: https://issues.apache.org/jira/browse/TAJO-992
>             Project: Tajo
>          Issue Type: Sub-task
>          Components: data shuffle
>            Reporter: Hyoungjun Kim
>            Assignee: Hyoungjun Kim
>
> Currently Tajo creates too many intermediate files in the case of hash 
> shuffle. A execution block(SubQuery) on a TajoWorker creates intermediate 
> files  as following rule:
>   # intermediate files  in a worker = # tasks / # workers * # partitions 
> This may cause 'too many file opens' error and makes it difficult to scale 
> out. To solve this problem, We should reduce number of hash shuffle output 
> file.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to