[
https://issues.apache.org/jira/browse/TAJO-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14100380#comment-14100380
]
ASF GitHub Bot commented on TAJO-992:
-------------------------------------
Github user hyunsik commented on a diff in the pull request:
https://github.com/apache/tajo/pull/115#discussion_r16339900
--- Diff:
tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java ---
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.util.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class HashShuffleAppender implements Appender {
+ private static Log LOG = LogFactory.getLog(HashShuffleAppender.class);
+
+ private FileAppender appender;
+ private AtomicBoolean closed = new AtomicBoolean(false);
+ private int partId;
+
+ private TableStats tableStats;
+
+ //<taskId,<page start offset,<task start, task end>>>
+ private Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer,
Integer>>>> taskTupleIndexes;
+
+ //page start offset, length
+ private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long,
Integer>>();
+
+ private Pair<Long, Integer> currentPage;
+
+ private int pageSize; //MB
+
+ private int rowNumInPage;
+
+ private int totalRows;
+
+ private long offset;
+
+ private ExecutionBlockId ebId;
+
+ public HashShuffleAppender(ExecutionBlockId ebId, int partId, int
pageSize, FileAppender appender) {
+ this.ebId = ebId;
+ this.partId = partId;
+ this.appender = appender;
+ this.pageSize = pageSize;
+ }
+
+ @Override
+ public void init() throws IOException {
+ currentPage = new Pair(0L, 0);
+ taskTupleIndexes = new HashMap<QueryUnitAttemptId, List<Pair<Long,
Pair<Integer, Integer>>>>();
+ rowNumInPage = 0;
+ }
+
+ /**
+ * Write multiple tuples. Each tuple is written by a FileAppender which
is responsible specified partition.
+ * After writing if a current page exceeds pageSize, pageOffset will be
added.
+ * @param taskId
+ * @param tuples
+ * @return written bytes
+ * @throws IOException
+ */
+ public int addTuples(QueryUnitAttemptId taskId, List<Tuple> tuples)
throws IOException {
+ synchronized(appender) {
+ if (closed.get()) {
+ return 0;
+ }
+ long currentPos = appender.getOffset();
+
+ for (Tuple eachTuple: tuples) {
+ appender.addTuple(eachTuple);
+ }
+ long posAfterWritten = appender.getOffset();
+
+ int writtenBytes = (int)(posAfterWritten - currentPos);
+
+ int nextRowNum = rowNumInPage + tuples.size();
+ List<Pair<Long, Pair<Integer, Integer>>> taskIndexes =
taskTupleIndexes.get(taskId);
+ if (taskIndexes == null) {
+ taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
+ taskTupleIndexes.put(taskId, taskIndexes);
+ }
+ taskIndexes.add(
+ new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(),
new Pair(rowNumInPage, nextRowNum)));
+ rowNumInPage = nextRowNum;
+
+ if (posAfterWritten - currentPage.getFirst() > pageSize) {
+ nextPage(posAfterWritten);
+ rowNumInPage = 0;
+ }
+
+ totalRows += tuples.size();
+ return writtenBytes;
+ }
+ }
+
+ public long getOffset() throws IOException {
+ if (closed.get()) {
+ return offset;
+ } else {
+ return appender.getOffset();
+ }
+ }
+
+ private void nextPage(long pos) {
+ currentPage.setSecond((int) (pos - currentPage.getFirst()));
+ pages.add(currentPage);
+ currentPage = new Pair(pos, 0);
+ }
+
+ @Override
+ public void addTuple(Tuple t) throws IOException {
+ throw new IOException("Not support addTuple, use addTuples()");
+ }
+
+ @Override
+ public void flush() throws IOException {
+ synchronized(appender) {
+ if (closed.get()) {
+ return;
+ }
+ appender.flush();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized(appender) {
+ if (closed.get()) {
+ return;
+ }
+ appender.flush();
+ offset = appender.getOffset();
+ if (offset > currentPage.getFirst()) {
+ nextPage(offset);
+ }
+ appender.close();
+ if (LOG.isDebugEnabled()) {
+ if (!pages.isEmpty()) {
+ LOG.info(ebId + ",partId=" + partId + " Appender closed:
fileLen=" + offset + ", pages=" + pages.size()
+ + ", lastPage=" + pages.get(pages.size() - 1));
+ } else {
+ LOG.info(ebId + ",partId=" + partId + " Appender closed:
fileLen=" + offset + ", pages=" + pages.size());
+ }
+ }
+ closed.set(true);
+ tableStats = appender.getStats();
+ }
+ }
+
+ @Override
+ public void enableStats() {
+ }
+
+ @Override
+ public TableStats getStats() {
+ synchronized(appender) {
+ return appender.getStats();
+ }
+ }
+
+ public List<Pair<Long, Integer>> getPages() {
+ return pages;
+ }
+
+ public Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>
getTaskTupleIndexes() {
+ return taskTupleIndexes;
+ }
+
+ public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() {
+ List<Pair<Long, Pair<Integer, Integer>>> merged = new
ArrayList<Pair<Long, Pair<Integer, Integer>>>();
+
+ for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex:
taskTupleIndexes.values()) {
+ merged.addAll(eachFailureIndex);
+ }
+
+ return merged;
+ }
+
+ public void taskFinished(QueryUnitAttemptId taskId) {
--- End diff --
In my view, finishTask or finializeTask may be more proper name for this
purpose.
> Reduce number of hash shuffle output file.
> ------------------------------------------
>
> Key: TAJO-992
> URL: https://issues.apache.org/jira/browse/TAJO-992
> Project: Tajo
> Issue Type: Sub-task
> Components: data shuffle
> Reporter: Hyoungjun Kim
> Assignee: Hyoungjun Kim
>
> Currently Tajo creates too many intermediate files in the case of hash
> shuffle. A execution block(SubQuery) on a TajoWorker creates intermediate
> files as following rule:
> # intermediate files in a worker = # tasks / # workers * # partitions
> This may cause 'too many file opens' error and makes it difficult to scale
> out. To solve this problem, We should reduce number of hash shuffle output
> file.
--
This message was sent by Atlassian JIRA
(v6.2#6252)