[
https://issues.apache.org/jira/browse/TAJO-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14100341#comment-14100341
]
ASF GitHub Bot commented on TAJO-992:
-------------------------------------
Github user hyunsik commented on a diff in the pull request:
https://github.com/apache/tajo/pull/115#discussion_r16338719
--- Diff:
tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
---
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.util.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HashShuffleAppenderManager {
+ private static final Log LOG =
LogFactory.getLog(HashShuffleAppenderManager.class);
+
+ private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>
appenderMap =
+ new ConcurrentHashMap<ExecutionBlockId, Map<Integer,
PartitionAppenderMeta>>();
+ private TajoConf systemConf;
+ private FileSystem defaultFS;
+ private FileSystem localFS;
+ private LocalDirAllocator lDirAllocator;
+ private int pageSize;
+
+ public HashShuffleAppenderManager(TajoConf systemConf) throws
IOException {
+ this.systemConf = systemConf;
+
+ // initialize LocalDirAllocator
+ lDirAllocator = new
LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+
+ // initialize DFS and LocalFileSystems
+ defaultFS =
TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
+ localFS = FileSystem.getLocal(systemConf);
+ pageSize =
systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024;
+ }
+
+ public HashShuffleAppender getAppender(TajoConf tajoConf,
ExecutionBlockId ebId, int partId,
+ TableMeta meta, Schema outSchema) throws
IOException {
+ synchronized (appenderMap) {
+ Map<Integer, PartitionAppenderMeta> partitionAppenderMap =
appenderMap.get(ebId);
+
+ if (partitionAppenderMap == null) {
+ partitionAppenderMap = new ConcurrentHashMap<Integer,
PartitionAppenderMeta>();
+ appenderMap.put(ebId, partitionAppenderMap);
+ }
+
+ PartitionAppenderMeta partitionAppenderMeta =
partitionAppenderMap.get(partId);
+ if (partitionAppenderMeta == null) {
+ Path dataFile = getDataFile(ebId, partId);
+ FileSystem fs = dataFile.getFileSystem(systemConf);
+ if (fs.exists(dataFile)) {
+ FileStatus status = fs.getFileStatus(dataFile);
+ LOG.info("File " + dataFile + " already exists, size=" +
status.getLen());
+ }
+
+ if (!fs.exists(dataFile.getParent())) {
+ fs.mkdirs(dataFile.getParent());
+ }
+ FileAppender appender = (FileAppender)
StorageManagerFactory.getStorageManager(
+ tajoConf).getAppender(meta, outSchema, dataFile);
+ appender.enableStats();
+ appender.init();
+
+ partitionAppenderMeta = new PartitionAppenderMeta();
+ partitionAppenderMeta.partId = partId;
+ partitionAppenderMeta.dataFile = dataFile;
+ partitionAppenderMeta.appender = new HashShuffleAppender(ebId,
partId, pageSize, appender);
+ partitionAppenderMeta.appender.init();
+ partitionAppenderMap.put(partId, partitionAppenderMeta);
+
+ LOG.info("Create Hash shuffle file(partId=" + partId + "): " +
dataFile);
+ }
+
+ return partitionAppenderMeta.appender;
+ }
+ }
+
+ public static int getPartParentId(int partId, TajoConf tajoConf) {
+ return partId %
tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS);
+ }
+
+ private Path getDataFile(ExecutionBlockId ebId, int partId) throws
IOException {
+ try {
+ // the base dir for an output dir
+ String executionBlockBaseDir = ebId.getQueryId().toString() +
"/output" + "/" + ebId.getId() + "/hash-shuffle";
+ Path baseDirPath =
localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir,
systemConf));
+ //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")");
+
+ // If EB has many partition, too many shuffle file are in single
directory.
+ return StorageUtil.concatPath(baseDirPath, "" +
getPartParentId(partId, systemConf), "" + partId);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e);
+ }
+ }
+
+ public Path getPartitionAppenderDataFile(ExecutionBlockId ebId, int
partId) {
--- End diff --
It seems to be not used.
> Reduce number of hash shuffle output file.
> ------------------------------------------
>
> Key: TAJO-992
> URL: https://issues.apache.org/jira/browse/TAJO-992
> Project: Tajo
> Issue Type: Sub-task
> Components: data shuffle
> Reporter: Hyoungjun Kim
> Assignee: Hyoungjun Kim
>
> Currently Tajo creates too many intermediate files in the case of hash
> shuffle. A execution block(SubQuery) on a TajoWorker creates intermediate
> files as following rule:
> # intermediate files in a worker = # tasks / # workers * # partitions
> This may cause 'too many file opens' error and makes it difficult to scale
> out. To solve this problem, We should reduce number of hash shuffle output
> file.
--
This message was sent by Atlassian JIRA
(v6.2#6252)