[ 
https://issues.apache.org/jira/browse/TAJO-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14100380#comment-14100380
 ] 

ASF GitHub Bot commented on TAJO-992:
-------------------------------------

Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/115#discussion_r16339900
  
    --- Diff: 
tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java ---
    @@ -0,0 +1,204 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.storage;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.tajo.ExecutionBlockId;
    +import org.apache.tajo.QueryUnitAttemptId;
    +import org.apache.tajo.catalog.statistics.TableStats;
    +import org.apache.tajo.util.Pair;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +public class HashShuffleAppender implements Appender {
    +  private static Log LOG = LogFactory.getLog(HashShuffleAppender.class);
    +
    +  private FileAppender appender;
    +  private AtomicBoolean closed = new AtomicBoolean(false);
    +  private int partId;
    +
    +  private TableStats tableStats;
    +
    +  //<taskId,<page start offset,<task start, task end>>>
    +  private Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, 
Integer>>>> taskTupleIndexes;
    +
    +  //page start offset, length
    +  private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, 
Integer>>();
    +
    +  private Pair<Long, Integer> currentPage;
    +
    +  private int pageSize; //MB
    +
    +  private int rowNumInPage;
    +
    +  private int totalRows;
    +
    +  private long offset;
    +
    +  private ExecutionBlockId ebId;
    +
    +  public HashShuffleAppender(ExecutionBlockId ebId, int partId, int 
pageSize, FileAppender appender) {
    +    this.ebId = ebId;
    +    this.partId = partId;
    +    this.appender = appender;
    +    this.pageSize = pageSize;
    +  }
    +
    +  @Override
    +  public void init() throws IOException {
    +    currentPage = new Pair(0L, 0);
    +    taskTupleIndexes = new HashMap<QueryUnitAttemptId, List<Pair<Long, 
Pair<Integer, Integer>>>>();
    +    rowNumInPage = 0;
    +  }
    +
    +  /**
    +   * Write multiple tuples. Each tuple is written by a FileAppender which 
is responsible specified partition.
    +   * After writing if a current page exceeds pageSize, pageOffset will be 
added.
    +   * @param taskId
    +   * @param tuples
    +   * @return written bytes
    +   * @throws IOException
    +   */
    +  public int addTuples(QueryUnitAttemptId taskId, List<Tuple> tuples) 
throws IOException {
    +    synchronized(appender) {
    +      if (closed.get()) {
    +        return 0;
    +      }
    +      long currentPos = appender.getOffset();
    +
    +      for (Tuple eachTuple: tuples) {
    +        appender.addTuple(eachTuple);
    +      }
    +      long posAfterWritten = appender.getOffset();
    +
    +      int writtenBytes = (int)(posAfterWritten - currentPos);
    +
    +      int nextRowNum = rowNumInPage + tuples.size();
    +      List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = 
taskTupleIndexes.get(taskId);
    +      if (taskIndexes == null) {
    +        taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
    +        taskTupleIndexes.put(taskId, taskIndexes);
    +      }
    +      taskIndexes.add(
    +          new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), 
new Pair(rowNumInPage, nextRowNum)));
    +      rowNumInPage = nextRowNum;
    +
    +      if (posAfterWritten - currentPage.getFirst() > pageSize) {
    +        nextPage(posAfterWritten);
    +        rowNumInPage = 0;
    +      }
    +
    +      totalRows += tuples.size();
    +      return writtenBytes;
    +    }
    +  }
    +
    +  public long getOffset() throws IOException {
    +    if (closed.get()) {
    +      return offset;
    +    } else {
    +      return appender.getOffset();
    +    }
    +  }
    +
    +  private void nextPage(long pos) {
    +    currentPage.setSecond((int) (pos - currentPage.getFirst()));
    +    pages.add(currentPage);
    +    currentPage = new Pair(pos, 0);
    +  }
    +
    +  @Override
    +  public void addTuple(Tuple t) throws IOException {
    +    throw new IOException("Not support addTuple, use addTuples()");
    +  }
    +
    +  @Override
    +  public void flush() throws IOException {
    +    synchronized(appender) {
    +      if (closed.get()) {
    +        return;
    +      }
    +      appender.flush();
    +    }
    +  }
    +
    +  @Override
    +  public void close() throws IOException {
    +    synchronized(appender) {
    +      if (closed.get()) {
    +        return;
    +      }
    +      appender.flush();
    +      offset = appender.getOffset();
    +      if (offset > currentPage.getFirst()) {
    +        nextPage(offset);
    +      }
    +      appender.close();
    +      if (LOG.isDebugEnabled()) {
    +        if (!pages.isEmpty()) {
    +          LOG.info(ebId + ",partId=" + partId + " Appender closed: 
fileLen=" + offset + ", pages=" + pages.size()
    +              + ", lastPage=" + pages.get(pages.size() - 1));
    +        } else {
    +          LOG.info(ebId + ",partId=" + partId + " Appender closed: 
fileLen=" + offset + ", pages=" + pages.size());
    +        }
    +      }
    +      closed.set(true);
    +      tableStats = appender.getStats();
    +    }
    +  }
    +
    +  @Override
    +  public void enableStats() {
    +  }
    +
    +  @Override
    +  public TableStats getStats() {
    +    synchronized(appender) {
    +      return appender.getStats();
    +    }
    +  }
    +
    +  public List<Pair<Long, Integer>> getPages() {
    +    return pages;
    +  }
    +
    +  public Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> 
getTaskTupleIndexes() {
    +    return taskTupleIndexes;
    +  }
    +
    +  public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() {
    +    List<Pair<Long, Pair<Integer, Integer>>> merged = new 
ArrayList<Pair<Long, Pair<Integer, Integer>>>();
    +
    +    for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: 
taskTupleIndexes.values()) {
    +      merged.addAll(eachFailureIndex);
    +    }
    +
    +    return merged;
    +  }
    +
    +  public void taskFinished(QueryUnitAttemptId taskId) {
    --- End diff --
    
    In my view, finishTask or finializeTask may be more proper name for this 
purpose.


> Reduce number of hash shuffle output file.
> ------------------------------------------
>
>                 Key: TAJO-992
>                 URL: https://issues.apache.org/jira/browse/TAJO-992
>             Project: Tajo
>          Issue Type: Sub-task
>          Components: data shuffle
>            Reporter: Hyoungjun Kim
>            Assignee: Hyoungjun Kim
>
> Currently Tajo creates too many intermediate files in the case of hash 
> shuffle. A execution block(SubQuery) on a TajoWorker creates intermediate 
> files  as following rule:
>   # intermediate files  in a worker = # tasks / # workers * # partitions 
> This may cause 'too many file opens' error and makes it difficult to scale 
> out. To solve this problem, We should reduce number of hash shuffle output 
> file.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to