[ 
https://issues.apache.org/jira/browse/TAJO-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14100389#comment-14100389
 ] 

ASF GitHub Bot commented on TAJO-992:
-------------------------------------

Github user babokim commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/115#discussion_r16340019
  
    --- Diff: 
tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
 ---
    @@ -0,0 +1,240 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.storage;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.LocalDirAllocator;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.tajo.ExecutionBlockId;
    +import org.apache.tajo.QueryUnitAttemptId;
    +import org.apache.tajo.catalog.Schema;
    +import org.apache.tajo.catalog.TableMeta;
    +import org.apache.tajo.conf.TajoConf;
    +import org.apache.tajo.conf.TajoConf.ConfVars;
    +import org.apache.tajo.util.Pair;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +public class HashShuffleAppenderManager {
    +  private static final Log LOG = 
LogFactory.getLog(HashShuffleAppenderManager.class);
    +
    +  private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> 
appenderMap =
    +      new ConcurrentHashMap<ExecutionBlockId, Map<Integer, 
PartitionAppenderMeta>>();
    +  private TajoConf systemConf;
    +  private FileSystem defaultFS;
    +  private FileSystem localFS;
    +  private LocalDirAllocator lDirAllocator;
    +  private int pageSize;
    +
    +  public HashShuffleAppenderManager(TajoConf systemConf) throws 
IOException {
    +    this.systemConf = systemConf;
    +
    +    // initialize LocalDirAllocator
    +    lDirAllocator = new 
LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
    +
    +    // initialize DFS and LocalFileSystems
    +    defaultFS = 
TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
    +    localFS = FileSystem.getLocal(systemConf);
    +    pageSize = 
systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024;
    +  }
    +
    +  public HashShuffleAppender getAppender(TajoConf tajoConf, 
ExecutionBlockId ebId, int partId,
    +                              TableMeta meta, Schema outSchema) throws 
IOException {
    +    synchronized (appenderMap) {
    +      Map<Integer, PartitionAppenderMeta> partitionAppenderMap = 
appenderMap.get(ebId);
    +
    +      if (partitionAppenderMap == null) {
    +        partitionAppenderMap = new ConcurrentHashMap<Integer, 
PartitionAppenderMeta>();
    +        appenderMap.put(ebId, partitionAppenderMap);
    +      }
    +
    +      PartitionAppenderMeta partitionAppenderMeta = 
partitionAppenderMap.get(partId);
    +      if (partitionAppenderMeta == null) {
    +        Path dataFile = getDataFile(ebId, partId);
    +        FileSystem fs = dataFile.getFileSystem(systemConf);
    +        if (fs.exists(dataFile)) {
    +          FileStatus status = fs.getFileStatus(dataFile);
    +          LOG.info("File " + dataFile + " already exists, size=" + 
status.getLen());
    +        }
    +
    +        if (!fs.exists(dataFile.getParent())) {
    +          fs.mkdirs(dataFile.getParent());
    +        }
    +        FileAppender appender = (FileAppender) 
StorageManagerFactory.getStorageManager(
    +            tajoConf).getAppender(meta, outSchema, dataFile);
    +        appender.enableStats();
    +        appender.init();
    +
    +        partitionAppenderMeta = new PartitionAppenderMeta();
    +        partitionAppenderMeta.partId = partId;
    +        partitionAppenderMeta.dataFile = dataFile;
    +        partitionAppenderMeta.appender = new HashShuffleAppender(ebId, 
partId, pageSize, appender);
    +        partitionAppenderMeta.appender.init();
    +        partitionAppenderMap.put(partId, partitionAppenderMeta);
    +
    +        LOG.info("Create Hash shuffle file(partId=" + partId + "): " + 
dataFile);
    +      }
    +
    +      return partitionAppenderMeta.appender;
    +    }
    +  }
    +
    +  public static int getPartParentId(int partId, TajoConf tajoConf) {
    +    return partId % 
tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS);
    --- End diff --
    
    getPartParentId() method is used by TajoPullServerService.


> Reduce number of hash shuffle output file.
> ------------------------------------------
>
>                 Key: TAJO-992
>                 URL: https://issues.apache.org/jira/browse/TAJO-992
>             Project: Tajo
>          Issue Type: Sub-task
>          Components: data shuffle
>            Reporter: Hyoungjun Kim
>            Assignee: Hyoungjun Kim
>
> Currently Tajo creates too many intermediate files in the case of hash 
> shuffle. A execution block(SubQuery) on a TajoWorker creates intermediate 
> files  as following rule:
>   # intermediate files  in a worker = # tasks / # workers * # partitions 
> This may cause 'too many file opens' error and makes it difficult to scale 
> out. To solve this problem, We should reduce number of hash shuffle output 
> file.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to