http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
deleted file mode 100644
index 4efc3b7..0000000
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
+++ /dev/null
@@ -1,1271 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.tajo.*;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.NodeType;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.Bytes;
-import org.apache.tajo.util.TUtil;
-
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class FileStorageManager extends StorageManager {
-  private final Log LOG = LogFactory.getLog(FileStorageManager.class);
-
-  static final String OUTPUT_FILE_PREFIX="part-";
-  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_STAGE =
-      new ThreadLocal<NumberFormat>() {
-        @Override
-        public NumberFormat initialValue() {
-          NumberFormat fmt = NumberFormat.getInstance();
-          fmt.setGroupingUsed(false);
-          fmt.setMinimumIntegerDigits(2);
-          return fmt;
-        }
-      };
-  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK =
-      new ThreadLocal<NumberFormat>() {
-        @Override
-        public NumberFormat initialValue() {
-          NumberFormat fmt = NumberFormat.getInstance();
-          fmt.setGroupingUsed(false);
-          fmt.setMinimumIntegerDigits(6);
-          return fmt;
-        }
-      };
-
-  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ =
-      new ThreadLocal<NumberFormat>() {
-        @Override
-        public NumberFormat initialValue() {
-          NumberFormat fmt = NumberFormat.getInstance();
-          fmt.setGroupingUsed(false);
-          fmt.setMinimumIntegerDigits(3);
-          return fmt;
-        }
-      };
-
-  protected FileSystem fs;
-  protected Path tableBaseDir;
-  protected boolean blocksMetadataEnabled;
-  private static final HdfsVolumeId zeroVolumeId = new 
HdfsVolumeId(Bytes.toBytes(0));
-
-  public FileStorageManager(String storeType) {
-    super(storeType);
-  }
-
-  @Override
-  protected void storageInit() throws IOException {
-    this.tableBaseDir = TajoConf.getWarehouseDir(conf);
-    this.fs = tableBaseDir.getFileSystem(conf);
-    this.blocksMetadataEnabled = 
conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
-        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
-    if (!this.blocksMetadataEnabled)
-      LOG.warn("does not support block metadata. 
('dfs.datanode.hdfs-blocks-metadata.enabled')");
-  }
-
-  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
-      throws IOException {
-    FileSystem fs = path.getFileSystem(conf);
-    FileStatus status = fs.getFileStatus(path);
-    return getFileScanner(meta, schema, path, status);
-  }
-
-  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, 
FileStatus status)
-      throws IOException {
-    Fragment fragment = new FileFragment(path.getName(), path, 0, 
status.getLen());
-    return getScanner(meta, schema, fragment);
-  }
-
-  public FileSystem getFileSystem() {
-    return this.fs;
-  }
-
-  public Path getWarehouseDir() {
-    return this.tableBaseDir;
-  }
-
-  public void delete(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    fs.delete(tablePath, true);
-  }
-
-  public boolean exists(Path path) throws IOException {
-    FileSystem fileSystem = path.getFileSystem(conf);
-    return fileSystem.exists(path);
-  }
-
-  /**
-   * This method deletes only data contained in the given path.
-   *
-   * @param path The path in which data are deleted.
-   * @throws IOException
-   */
-  public void deleteData(Path path) throws IOException {
-    FileSystem fileSystem = path.getFileSystem(conf);
-    FileStatus[] fileLists = fileSystem.listStatus(path);
-    for (FileStatus status : fileLists) {
-      fileSystem.delete(status.getPath(), true);
-    }
-  }
-
-  public Path getTablePath(String tableName) {
-    return new Path(tableBaseDir, tableName);
-  }
-
-  private String partitionPath = "";
-  private int currentDepth = 0;
-
-  /**
-   * Set a specific partition path for partition-column only queries
-   * @param path The partition prefix path
-   */
-  public void setPartitionPath(String path) { partitionPath = path; }
-
-  /**
-   * Set a depth of partition path for partition-column only queries
-   * @param depth Depth of partitions
-   */
-  public void setCurrentDepth(int depth) { currentDepth = depth; }
-
-  @VisibleForTesting
-  public Appender getAppender(TableMeta meta, Schema schema, Path filePath)
-      throws IOException {
-    return getAppender(null, null, meta, schema, filePath);
-  }
-
-  public FileFragment[] split(String tableName) throws IOException {
-    Path tablePath = new Path(tableBaseDir, tableName);
-    return split(tableName, tablePath, fs.getDefaultBlockSize());
-  }
-
-  public FileFragment[] split(String tableName, long fragmentSize) throws 
IOException {
-    Path tablePath = new Path(tableBaseDir, tableName);
-    return split(tableName, tablePath, fragmentSize);
-  }
-
-  public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException 
{
-    FileSystem fs = tablePath.getFileSystem(conf);
-    List<FileFragment> listTablets = new ArrayList<FileFragment>();
-    FileFragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus file : fileLists) {
-      tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, 
file.getLen());
-      listTablets.add(tablet);
-    }
-
-    FileFragment[] tablets = new FileFragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  public FileFragment[] split(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
-  }
-
-  public FileFragment[] split(String tableName, Path tablePath) throws 
IOException {
-    return split(tableName, tablePath, fs.getDefaultBlockSize());
-  }
-
-  private FileFragment[] split(String tableName, Path tablePath, long size)
-      throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-
-    long defaultBlockSize = size;
-    List<FileFragment> listTablets = new ArrayList<FileFragment>();
-    FileFragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus file : fileLists) {
-      long remainFileSize = file.getLen();
-      long start = 0;
-      if (remainFileSize > defaultBlockSize) {
-        while (remainFileSize > defaultBlockSize) {
-          tablet = new FileFragment(tableName, file.getPath(), start, 
defaultBlockSize);
-          listTablets.add(tablet);
-          start += defaultBlockSize;
-          remainFileSize -= defaultBlockSize;
-        }
-        listTablets.add(new FileFragment(tableName, file.getPath(), start, 
remainFileSize));
-      } else {
-        listTablets.add(new FileFragment(tableName, file.getPath(), 0, 
remainFileSize));
-      }
-    }
-
-    FileFragment[] tablets = new FileFragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  public static FileFragment[] splitNG(Configuration conf, String tableName, 
TableMeta meta,
-                                       Path tablePath, long size)
-      throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-
-    long defaultBlockSize = size;
-    List<FileFragment> listTablets = new ArrayList<FileFragment>();
-    FileFragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus file : fileLists) {
-      long remainFileSize = file.getLen();
-      long start = 0;
-      if (remainFileSize > defaultBlockSize) {
-        while (remainFileSize > defaultBlockSize) {
-          tablet = new FileFragment(tableName, file.getPath(), start, 
defaultBlockSize);
-          listTablets.add(tablet);
-          start += defaultBlockSize;
-          remainFileSize -= defaultBlockSize;
-        }
-        listTablets.add(new FileFragment(tableName, file.getPath(), start, 
remainFileSize));
-      } else {
-        listTablets.add(new FileFragment(tableName, file.getPath(), 0, 
remainFileSize));
-      }
-    }
-
-    FileFragment[] tablets = new FileFragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  public long calculateSize(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    long totalSize = 0;
-
-    if (fs.exists(tablePath)) {
-      totalSize = fs.getContentSummary(tablePath).getLength();
-    }
-
-    return totalSize;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // FileInputFormat Area
-  /////////////////////////////////////////////////////////////////////////////
-  public Path getAppenderFilePath(TaskAttemptId taskAttemptId, Path workDir) {
-    if (taskAttemptId == null) {
-      // For testcase
-      return workDir;
-    }
-    // The final result of a task will be written in a file named 
part-ss-nnnnnnn,
-    // where ss is the stage id associated with this task, and nnnnnn is the 
task id.
-    Path outFilePath = StorageUtil.concatPath(workDir, 
TajoConstants.RESULT_DIR_NAME,
-        OUTPUT_FILE_PREFIX +
-            
OUTPUT_FILE_FORMAT_STAGE.get().format(taskAttemptId.getTaskId().getExecutionBlockId().getId())
 + "-" +
-            
OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getTaskId().getId()) + "-" +
-            OUTPUT_FILE_FORMAT_SEQ.get().format(0));
-    LOG.info("Output File Path: " + outFilePath);
-
-    return outFilePath;
-  }
-
-  /**
-   * Proxy PathFilter that accepts a path only if all filters given in the
-   * constructor do. Used by the listPaths() to apply the built-in
-   * hiddenFileFilter together with a user provided one (if any).
-   */
-  private static class MultiPathFilter implements PathFilter {
-    private List<PathFilter> filters;
-
-    public MultiPathFilter(List<PathFilter> filters) {
-      this.filters = filters;
-    }
-
-    public boolean accept(Path path) {
-      for (PathFilter filter : filters) {
-        if (!filter.accept(path)) {
-          return false;
-        }
-      }
-      return true;
-    }
-  }
-
-  /**
-   * List input directories.
-   * Subclasses may override to, e.g., select only files matching a regular
-   * expression.
-   *
-   * @return array of FileStatus objects
-   * @throws IOException if zero items.
-   */
-  protected List<FileStatus> listStatus(Path... dirs) throws IOException {
-    List<FileStatus> result = new ArrayList<FileStatus>();
-    if (dirs.length == 0) {
-      throw new IOException("No input paths specified in job");
-    }
-
-    List<IOException> errors = new ArrayList<IOException>();
-
-    // creates a MultiPathFilter with the hiddenFileFilter and the
-    // user provided one (if any).
-    List<PathFilter> filters = new ArrayList<PathFilter>();
-    filters.add(hiddenFileFilter);
-
-    PathFilter inputFilter = new MultiPathFilter(filters);
-
-    for (int i = 0; i < dirs.length; ++i) {
-      Path p = dirs[i];
-
-      FileSystem fs = p.getFileSystem(conf);
-      FileStatus[] matches = fs.globStatus(p, inputFilter);
-      if (matches == null) {
-        errors.add(new IOException("Input path does not exist: " + p));
-      } else if (matches.length == 0) {
-        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
-      } else {
-        for (FileStatus globStat : matches) {
-          if (globStat.isDirectory()) {
-            for (FileStatus stat : fs.listStatus(globStat.getPath(),
-                inputFilter)) {
-              result.add(stat);
-            }
-          } else {
-            result.add(globStat);
-          }
-        }
-      }
-    }
-
-    if (!errors.isEmpty()) {
-      throw new InvalidInputException(errors);
-    }
-    LOG.info("Total input paths to process : " + result.size());
-    return result;
-  }
-
-  /**
-   * Is the given filename splitable? Usually, true, but if the file is
-   * stream compressed, it will not be.
-   * <p/>
-   * <code>FileInputFormat</code> implementations can override this and return
-   * <code>false</code> to ensure that individual input files are never 
split-up
-   * so that Mappers process entire files.
-   *
-   *
-   * @param path the file name to check
-   * @param status get the file length
-   * @return is this file isSplittable?
-   */
-  protected boolean isSplittable(TableMeta meta, Schema schema, Path path, 
FileStatus status) throws IOException {
-    Scanner scanner = getFileScanner(meta, schema, path, status);
-    boolean split = scanner.isSplittable();
-    scanner.close();
-    return split;
-  }
-
-  private static final double SPLIT_SLOP = 1.1;   // 10% slop
-
-  protected int getBlockIndex(BlockLocation[] blkLocations,
-                              long offset) {
-    for (int i = 0; i < blkLocations.length; i++) {
-      // is the offset inside this block?
-      if ((blkLocations[i].getOffset() <= offset) &&
-          (offset < blkLocations[i].getOffset() + 
blkLocations[i].getLength())) {
-        return i;
-      }
-    }
-    BlockLocation last = blkLocations[blkLocations.length - 1];
-    long fileLength = last.getOffset() + last.getLength() - 1;
-    throw new IllegalArgumentException("Offset " + offset +
-        " is outside of file (0.." +
-        fileLength + ")");
-  }
-
-  /**
-   * A factory that makes the split for this class. It can be overridden
-   * by sub-classes to make sub-types
-   */
-  protected FileFragment makeSplit(String fragmentId, Path file, long start, 
long length) {
-    return new FileFragment(fragmentId, file, start, length);
-  }
-
-  protected FileFragment makeSplit(String fragmentId, Path file, long start, 
long length,
-                                   String[] hosts) {
-    return new FileFragment(fragmentId, file, start, length, hosts);
-  }
-
-  protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation 
blockLocation)
-      throws IOException {
-    return new FileFragment(fragmentId, file, blockLocation);
-  }
-
-  // for Non Splittable. eg, compressed gzip TextFile
-  protected FileFragment makeNonSplit(String fragmentId, Path file, long 
start, long length,
-                                      BlockLocation[] blkLocations) throws 
IOException {
-
-    Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
-    for (BlockLocation blockLocation : blkLocations) {
-      for (String host : blockLocation.getHosts()) {
-        if (hostsBlockMap.containsKey(host)) {
-          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
-        } else {
-          hostsBlockMap.put(host, 1);
-        }
-      }
-    }
-
-    List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, 
Integer>>(hostsBlockMap.entrySet());
-    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
-
-      @Override
-      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, 
Integer> v2) {
-        return v1.getValue().compareTo(v2.getValue());
-      }
-    });
-
-    String[] hosts = new String[blkLocations[0].getHosts().length];
-
-    for (int i = 0; i < hosts.length; i++) {
-      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
-      hosts[i] = entry.getKey();
-    }
-    return new FileFragment(fragmentId, file, start, length, hosts);
-  }
-
-  /**
-   * Get the minimum split size
-   *
-   * @return the minimum number of bytes that can be in a split
-   */
-  public long getMinSplitSize() {
-    return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE);
-  }
-
-  /**
-   * Get Disk Ids by Volume Bytes
-   */
-  private int[] getDiskIds(VolumeId[] volumeIds) {
-    int[] diskIds = new int[volumeIds.length];
-    for (int i = 0; i < volumeIds.length; i++) {
-      int diskId = -1;
-      if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
-        diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode();
-      }
-      diskIds[i] = diskId;
-    }
-    return diskIds;
-  }
-
-  /**
-   * Generate the map of host and make them into Volume Ids.
-   *
-   */
-  private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
-    Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
-    for (FileFragment frag : frags) {
-      String[] hosts = frag.getHosts();
-      int[] diskIds = frag.getDiskIds();
-      for (int i = 0; i < hosts.length; i++) {
-        Set<Integer> volumeList = volumeMap.get(hosts[i]);
-        if (volumeList == null) {
-          volumeList = new HashSet<Integer>();
-          volumeMap.put(hosts[i], volumeList);
-        }
-
-        if (diskIds.length > 0 && diskIds[i] > -1) {
-          volumeList.add(diskIds[i]);
-        }
-      }
-    }
-
-    return volumeMap;
-  }
-  /**
-   * Generate the list of files and make them into FileSplits.
-   *
-   * @throws IOException
-   */
-  public List<Fragment> getSplits(String tableName, TableMeta meta, Schema 
schema, Path... inputs)
-      throws IOException {
-    // generate splits'
-
-    List<Fragment> splits = Lists.newArrayList();
-    List<Fragment> volumeSplits = Lists.newArrayList();
-    List<BlockLocation> blockLocations = Lists.newArrayList();
-
-    for (Path p : inputs) {
-      FileSystem fs = p.getFileSystem(conf);
-
-      ArrayList<FileStatus> files = Lists.newArrayList();
-      if (fs.isFile(p)) {
-        files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
-      } else {
-        files.addAll(listStatus(p));
-      }
-
-      int previousSplitSize = splits.size();
-      for (FileStatus file : files) {
-        Path path = file.getPath();
-        long length = file.getLen();
-        if (length > 0) {
-          // Get locations of blocks of file
-          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, 
length);
-          boolean splittable = isSplittable(meta, schema, path, file);
-          if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
-
-            if (splittable) {
-              for (BlockLocation blockLocation : blkLocations) {
-                volumeSplits.add(makeSplit(tableName, path, blockLocation));
-              }
-              blockLocations.addAll(Arrays.asList(blkLocations));
-
-            } else { // Non splittable
-              long blockSize = blkLocations[0].getLength();
-              if (blockSize >= length) {
-                blockLocations.addAll(Arrays.asList(blkLocations));
-                for (BlockLocation blockLocation : blkLocations) {
-                  volumeSplits.add(makeSplit(tableName, path, blockLocation));
-                }
-              } else {
-                splits.add(makeNonSplit(tableName, path, 0, length, 
blkLocations));
-              }
-            }
-
-          } else {
-            if (splittable) {
-
-              long minSize = Math.max(getMinSplitSize(), 1);
-
-              long blockSize = file.getBlockSize(); // s3n rest api contained 
block size but blockLocations is one
-              long splitSize = Math.max(minSize, blockSize);
-              long bytesRemaining = length;
-
-              // for s3
-              while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
-                int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
-                splits.add(makeSplit(tableName, path, length - bytesRemaining, 
splitSize,
-                    blkLocations[blkIndex].getHosts()));
-                bytesRemaining -= splitSize;
-              }
-              if (bytesRemaining > 0) {
-                int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
-                splits.add(makeSplit(tableName, path, length - bytesRemaining, 
bytesRemaining,
-                    blkLocations[blkIndex].getHosts()));
-              }
-            } else { // Non splittable
-              splits.add(makeNonSplit(tableName, path, 0, length, 
blkLocations));
-            }
-          }
-        } else {
-          //for zero length files
-          splits.add(makeSplit(tableName, path, 0, length));
-        }
-      }
-      if(LOG.isDebugEnabled()){
-        LOG.debug("# of splits per partition: " + (splits.size() - 
previousSplitSize));
-      }
-    }
-
-    // Combine original fileFragments with new VolumeId information
-    setVolumeMeta(volumeSplits, blockLocations);
-    splits.addAll(volumeSplits);
-    LOG.info("Total # of splits: " + splits.size());
-    return splits;
-  }
-
-  private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> 
blockLocations)
-      throws IOException {
-
-    int locationSize = blockLocations.size();
-    int splitSize = splits.size();
-    if (locationSize == 0 || splitSize == 0) return;
-
-    if (locationSize != splitSize) {
-      // splits and locations don't match up
-      LOG.warn("Number of block locations not equal to number of splits: "
-          + "#locations=" + locationSize
-          + " #splits=" + splitSize);
-      return;
-    }
-
-    DistributedFileSystem fs = 
(DistributedFileSystem)DistributedFileSystem.get(conf);
-    int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, 
DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
-    int blockLocationIdx = 0;
-
-    Iterator<Fragment> iter = splits.iterator();
-    while (locationSize > blockLocationIdx) {
-
-      int subSize = Math.min(locationSize - blockLocationIdx, lsLimit);
-      List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, 
blockLocationIdx + subSize);
-      //BlockStorageLocation containing additional volume location information 
for each replica of each block.
-      BlockStorageLocation[] blockStorageLocations = 
fs.getFileBlockStorageLocations(locations);
-
-      for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
-        
((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds()));
-        blockLocationIdx++;
-      }
-    }
-    LOG.info("# of splits with volumeId " + splitSize);
-  }
-
-  private static class InvalidInputException extends IOException {
-    List<IOException> errors;
-    public InvalidInputException(List<IOException> errors) {
-      this.errors = errors;
-    }
-
-    @Override
-    public String getMessage(){
-      StringBuffer sb = new StringBuffer();
-      int messageLimit = Math.min(errors.size(), 10);
-      for (int i = 0; i < messageLimit ; i ++) {
-        sb.append(errors.get(i).getMessage()).append("\n");
-      }
-
-      if(messageLimit < errors.size())
-        sb.append("skipped .....").append("\n");
-
-      return sb.toString();
-    }
-  }
-
-  @Override
-  public List<Fragment> getSplits(String tableName, TableDesc table, ScanNode 
scanNode) throws IOException {
-    return getSplits(tableName, table.getMeta(), table.getSchema(), new 
Path(table.getPath()));
-  }
-
-  @Override
-  public void createTable(TableDesc tableDesc, boolean ifNotExists) throws 
IOException {
-    if (!tableDesc.isExternal()) {
-      String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName());
-      String databaseName = splitted[0];
-      String simpleTableName = splitted[1];
-
-      // create a table directory (i.e., 
${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
-      Path tablePath = StorageUtil.concatPath(tableBaseDir, databaseName, 
simpleTableName);
-      tableDesc.setPath(tablePath.toUri());
-    } else {
-      Preconditions.checkState(tableDesc.getPath() != null, "ERROR: LOCATION 
must be given.");
-    }
-
-    Path path = new Path(tableDesc.getPath());
-
-    FileSystem fs = path.getFileSystem(conf);
-    TableStats stats = new TableStats();
-    if (tableDesc.isExternal()) {
-      if (!fs.exists(path)) {
-        LOG.error(path.toUri() + " does not exist");
-        throw new IOException("ERROR: " + path.toUri() + " does not exist");
-      }
-    } else {
-      fs.mkdirs(path);
-    }
-
-    long totalSize = 0;
-
-    try {
-      totalSize = calculateSize(path);
-    } catch (IOException e) {
-      LOG.warn("Cannot calculate the size of the relation", e);
-    }
-
-    stats.setNumBytes(totalSize);
-
-    if (tableDesc.isExternal()) { // if it is an external table, there is no 
way to know the exact row number without processing.
-      stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
-    }
-
-    tableDesc.setStats(stats);
-  }
-
-  @Override
-  public void purgeTable(TableDesc tableDesc) throws IOException {
-    try {
-      Path path = new Path(tableDesc.getPath());
-      FileSystem fs = path.getFileSystem(conf);
-      LOG.info("Delete table data dir: " + path);
-      fs.delete(path, true);
-    } catch (IOException e) {
-      throw new InternalError(e.getMessage());
-    }
-  }
-
-  @Override
-  public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int 
currentPage, int numResultFragments) throws IOException {
-    // Listing table data file which is not empty.
-    // If the table is a partitioned table, return file list which has same 
partition key.
-    Path tablePath = new Path(tableDesc.getPath());
-    FileSystem fs = tablePath.getFileSystem(conf);
-
-    //In the case of partitioned table, we should return same partition key 
data files.
-    int partitionDepth = 0;
-    if (tableDesc.hasPartition()) {
-      partitionDepth = 
tableDesc.getPartitionMethod().getExpressionSchema().getRootColumns().size();
-    }
-
-    List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>();
-    if (fs.exists(tablePath)) {
-      if (!partitionPath.isEmpty()) {
-        Path partPath = new Path(tableDesc.getPath() + partitionPath);
-        if (fs.exists(partPath)) {
-          getNonZeroLengthDataFiles(fs, partPath, nonZeroLengthFiles, 
currentPage, numResultFragments,
-                  new AtomicInteger(0), tableDesc.hasPartition(), 
this.currentDepth, partitionDepth);
-        }
-      } else {
-        getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, 
currentPage, numResultFragments,
-                new AtomicInteger(0), tableDesc.hasPartition(), 0, 
partitionDepth);
-      }
-    }
-
-    List<Fragment> fragments = new ArrayList<Fragment>();
-
-    String[] previousPartitionPathNames = null;
-    for (FileStatus eachFile: nonZeroLengthFiles) {
-      FileFragment fileFragment = new FileFragment(tableDesc.getName(), 
eachFile.getPath(), 0, eachFile.getLen(), null);
-
-      if (partitionDepth > 0) {
-        // finding partition key;
-        Path filePath = fileFragment.getPath();
-        Path parentPath = filePath;
-        String[] parentPathNames = new String[partitionDepth];
-        for (int i = 0; i < partitionDepth; i++) {
-          parentPath = parentPath.getParent();
-          parentPathNames[partitionDepth - i - 1] = parentPath.getName();
-        }
-
-        // If current partitionKey == previousPartitionKey, add to result.
-        if (previousPartitionPathNames == null) {
-          fragments.add(fileFragment);
-        } else if (previousPartitionPathNames != null && 
Arrays.equals(previousPartitionPathNames, parentPathNames)) {
-          fragments.add(fileFragment);
-        } else {
-          break;
-        }
-        previousPartitionPathNames = parentPathNames;
-      } else {
-        fragments.add(fileFragment);
-      }
-    }
-
-    return fragments;
-  }
-
-  /**
-   *
-   * @param fs
-   * @param path The table path
-   * @param result The final result files to be used
-   * @param startFileIndex
-   * @param numResultFiles
-   * @param currentFileIndex
-   * @param partitioned A flag to indicate if this table is partitioned
-   * @param currentDepth Current visiting depth of partition directories
-   * @param maxDepth The partition depth of this table
-   * @throws IOException
-   */
-  private void getNonZeroLengthDataFiles(FileSystem fs, Path path, 
List<FileStatus> result,
-                                                int startFileIndex, int 
numResultFiles,
-                                                AtomicInteger 
currentFileIndex, boolean partitioned,
-                                                int currentDepth, int 
maxDepth) throws IOException {
-    // Intermediate directory
-    if (fs.isDirectory(path)) {
-
-      FileStatus[] files = fs.listStatus(path, 
StorageManager.hiddenFileFilter);
-
-      if (files != null && files.length > 0) {
-
-        for (FileStatus eachFile : files) {
-
-          // checking if the enough number of files are found
-          if (result.size() >= numResultFiles) {
-            return;
-          }
-          if (eachFile.isDirectory()) {
-
-            getNonZeroLengthDataFiles(
-                fs,
-                eachFile.getPath(),
-                result,
-                startFileIndex,
-                numResultFiles,
-                currentFileIndex,
-                partitioned,
-                currentDepth + 1, // increment a visiting depth
-                maxDepth);
-
-            // if partitioned table, we should ignore files located in the 
intermediate directory.
-            // we can ensure that this file is in leaf directory if 
currentDepth == maxDepth.
-          } else if (eachFile.isFile() && eachFile.getLen() > 0 && 
(!partitioned || currentDepth == maxDepth)) {
-            if (currentFileIndex.get() >= startFileIndex) {
-              result.add(eachFile);
-            }
-            currentFileIndex.incrementAndGet();
-          }
-        }
-      }
-
-      // Files located in leaf directory
-    } else {
-      FileStatus fileStatus = fs.getFileStatus(path);
-      if (fileStatus != null && fileStatus.getLen() > 0) {
-        if (currentFileIndex.get() >= startFileIndex) {
-          result.add(fileStatus);
-        }
-        currentFileIndex.incrementAndGet();
-        if (result.size() >= numResultFiles) {
-          return;
-        }
-      }
-    }
-  }
-
-  @Override
-  public StorageProperty getStorageProperty() {
-    StorageProperty storageProperty = new StorageProperty();
-    storageProperty.setSortedInsert(false);
-    if (storeType.equalsIgnoreCase("RAW")) {
-      storageProperty.setSupportsInsertInto(false);
-    } else {
-      storageProperty.setSupportsInsertInto(true);
-    }
-
-    return storageProperty;
-  }
-
-  @Override
-  public void close() {
-  }
-
-  @Override
-  public void beforeInsertOrCATS(LogicalNode node) throws IOException {
-  }
-
-  @Override
-  public void rollbackOutputCommit(LogicalNode node) throws IOException {
-  }
-
-  @Override
-  public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId 
finalEbId, LogicalPlan plan,
-                               Schema schema, TableDesc tableDesc) throws 
IOException {
-    return commitOutputData(queryContext, true);
-  }
-
-  @Override
-  public TupleRange[] getInsertSortRanges(OverridableConf queryContext, 
TableDesc tableDesc,
-                                          Schema inputSchema, SortSpec[] 
sortSpecs, TupleRange dataRange)
-      throws IOException {
-    return null;
-  }
-
-  /**
-   * Returns Scanner instance.
-   *
-   * @param conf The system property
-   * @param meta The table meta
-   * @param schema The input schema
-   * @param path The data file path
-   * @return Scanner instance
-   * @throws java.io.IOException
-   */
-  public static synchronized SeekableScanner getSeekableScanner(
-      TajoConf conf, TableMeta meta, Schema schema, Path path) throws 
IOException {
-
-    FileSystem fs = path.getFileSystem(conf);
-    FileStatus status = fs.getFileStatus(path);
-    FileFragment fragment = new FileFragment(path.getName(), path, 0, 
status.getLen());
-
-    return TableSpaceManager.getSeekableScanner(conf, meta, schema, fragment, 
schema);
-  }
-
-  /**
-   * Finalizes result data. Tajo stores result data in the staging directory.
-   * If the query fails, clean up the staging directory.
-   * Otherwise the query is successful, move to the final directory from the 
staging directory.
-   *
-   * @param queryContext The query property
-   * @param changeFileSeq If true change result file name with max sequence.
-   * @return Saved path
-   * @throws java.io.IOException
-   */
-  protected Path commitOutputData(OverridableConf queryContext, boolean 
changeFileSeq) throws IOException {
-    Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
-    Path stagingResultDir = new Path(stagingDir, 
TajoConstants.RESULT_DIR_NAME);
-    Path finalOutputDir;
-    if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) {
-      finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH));
-      try {
-        FileSystem fs = stagingResultDir.getFileSystem(conf);
-
-        if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // 
INSERT OVERWRITE INTO
-
-          // It moves the original table into the temporary location.
-          // Then it moves the new result table into the original table 
location.
-          // Upon failed, it recovers the original table if possible.
-          boolean movedToOldTable = false;
-          boolean committed = false;
-          Path oldTableDir = new Path(stagingDir, 
TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
-          ContentSummary summary = fs.getContentSummary(stagingResultDir);
-
-          if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty() && 
summary.getFileCount() > 0L) {
-            // This is a map for existing non-leaf directory to rename. A key 
is current directory and a value is
-            // renaming directory.
-            Map<Path, Path> renameDirs = TUtil.newHashMap();
-            // This is a map for recovering existing partition directory. A 
key is current directory and a value is
-            // temporary directory to back up.
-            Map<Path, Path> recoveryDirs = TUtil.newHashMap();
-
-            try {
-              if (!fs.exists(finalOutputDir)) {
-                fs.mkdirs(finalOutputDir);
-              }
-
-              visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, 
stagingResultDir.toString(),
-                  renameDirs, oldTableDir);
-
-              // Rename target partition directories
-              for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
-                // Backup existing data files for recovering
-                if (fs.exists(entry.getValue())) {
-                  String recoveryPathString = 
entry.getValue().toString().replaceAll(finalOutputDir.toString(),
-                      oldTableDir.toString());
-                  Path recoveryPath = new Path(recoveryPathString);
-                  fs.rename(entry.getValue(), recoveryPath);
-                  fs.exists(recoveryPath);
-                  recoveryDirs.put(entry.getValue(), recoveryPath);
-                }
-                // Delete existing directory
-                fs.delete(entry.getValue(), true);
-                // Rename staging directory to final output directory
-                fs.rename(entry.getKey(), entry.getValue());
-              }
-
-            } catch (IOException ioe) {
-              // Remove created dirs
-              for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
-                fs.delete(entry.getValue(), true);
-              }
-
-              // Recovery renamed dirs
-              for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
-                fs.delete(entry.getValue(), true);
-                fs.rename(entry.getValue(), entry.getKey());
-              }
-
-              throw new IOException(ioe.getMessage());
-            }
-          } else { // no partition
-            try {
-
-              // if the final output dir exists, move all contents to the 
temporary table dir.
-              // Otherwise, just make the final output dir. As a result, the 
final output dir will be empty.
-              if (fs.exists(finalOutputDir)) {
-                fs.mkdirs(oldTableDir);
-
-                for (FileStatus status : fs.listStatus(finalOutputDir, 
StorageManager.hiddenFileFilter)) {
-                  fs.rename(status.getPath(), oldTableDir);
-                }
-
-                movedToOldTable = fs.exists(oldTableDir);
-              } else { // if the parent does not exist, make its parent 
directory.
-                fs.mkdirs(finalOutputDir);
-              }
-
-              // Move the results to the final output dir.
-              for (FileStatus status : fs.listStatus(stagingResultDir)) {
-                fs.rename(status.getPath(), finalOutputDir);
-              }
-
-              // Check the final output dir
-              committed = fs.exists(finalOutputDir);
-
-            } catch (IOException ioe) {
-              // recover the old table
-              if (movedToOldTable && !committed) {
-
-                // if commit is failed, recover the old data
-                for (FileStatus status : fs.listStatus(finalOutputDir, 
StorageManager.hiddenFileFilter)) {
-                  fs.delete(status.getPath(), true);
-                }
-
-                for (FileStatus status : fs.listStatus(oldTableDir)) {
-                  fs.rename(status.getPath(), finalOutputDir);
-                }
-              }
-
-              throw new IOException(ioe.getMessage());
-            }
-          }
-        } else {
-          String queryType = queryContext.get(QueryVars.COMMAND_TYPE);
-
-          if (queryType != null && queryType.equals(NodeType.INSERT.name())) { 
// INSERT INTO an existing table
-
-            NumberFormat fmt = NumberFormat.getInstance();
-            fmt.setGroupingUsed(false);
-            fmt.setMinimumIntegerDigits(3);
-
-            if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) {
-              for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
-                if (eachFile.isFile()) {
-                  LOG.warn("Partition table can't have file in a staging dir: 
" + eachFile.getPath());
-                  continue;
-                }
-                moveResultFromStageToFinal(fs, stagingResultDir, eachFile, 
finalOutputDir, fmt, -1, changeFileSeq);
-              }
-            } else {
-              int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, 
false) + 1;
-              for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
-                if (eachFile.getPath().getName().startsWith("_")) {
-                  continue;
-                }
-                moveResultFromStageToFinal(fs, stagingResultDir, eachFile, 
finalOutputDir, fmt, maxSeq++, changeFileSeq);
-              }
-            }
-            // checking all file moved and remove empty dir
-            verifyAllFileMoved(fs, stagingResultDir);
-            FileStatus[] files = fs.listStatus(stagingResultDir);
-            if (files != null && files.length != 0) {
-              for (FileStatus eachFile: files) {
-                LOG.error("There are some unmoved files in staging dir:" + 
eachFile.getPath());
-              }
-            }
-          } else { // CREATE TABLE AS SELECT (CTAS)
-            if (fs.exists(finalOutputDir)) {
-              for (FileStatus status : fs.listStatus(stagingResultDir)) {
-                fs.rename(status.getPath(), finalOutputDir);
-              }
-            } else {
-              fs.rename(stagingResultDir, finalOutputDir);
-            }
-            LOG.info("Moved from the staging dir to the output directory '" + 
finalOutputDir);
-          }
-        }
-
-        // remove the staging directory if the final output dir is given.
-        Path stagingDirRoot = stagingDir.getParent();
-        fs.delete(stagingDirRoot, true);
-      } catch (Throwable t) {
-        LOG.error(t);
-        throw new IOException(t);
-      }
-    } else {
-      finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
-    }
-
-    return finalOutputDir;
-  }
-
-  /**
-   * Attach the sequence number to the output file name and than move the file 
into the final result path.
-   *
-   * @param fs FileSystem
-   * @param stagingResultDir The staging result dir
-   * @param fileStatus The file status
-   * @param finalOutputPath Final output path
-   * @param nf Number format
-   * @param fileSeq The sequence number
-   * @throws java.io.IOException
-   */
-  private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
-                                          FileStatus fileStatus, Path 
finalOutputPath,
-                                          NumberFormat nf,
-                                          int fileSeq, boolean changeFileSeq) 
throws IOException {
-    if (fileStatus.isDirectory()) {
-      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
-      if (subPath != null) {
-        Path finalSubPath = new Path(finalOutputPath, subPath);
-        if (!fs.exists(finalSubPath)) {
-          fs.mkdirs(finalSubPath);
-        }
-        int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
-        for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
-          if (eachFile.getPath().getName().startsWith("_")) {
-            continue;
-          }
-          moveResultFromStageToFinal(fs, stagingResultDir, eachFile, 
finalOutputPath, nf, ++maxSeq, changeFileSeq);
-        }
-      } else {
-        throw new IOException("Wrong staging dir:" + stagingResultDir + "," + 
fileStatus.getPath());
-      }
-    } else {
-      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
-      if (subPath != null) {
-        Path finalSubPath = new Path(finalOutputPath, subPath);
-        if (changeFileSeq) {
-          finalSubPath = new Path(finalSubPath.getParent(), 
replaceFileNameSeq(finalSubPath, fileSeq, nf));
-        }
-        if (!fs.exists(finalSubPath.getParent())) {
-          fs.mkdirs(finalSubPath.getParent());
-        }
-        if (fs.exists(finalSubPath)) {
-          throw new IOException("Already exists data file:" + finalSubPath);
-        }
-        boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
-        if (success) {
-          LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
-              "to final output[" + finalSubPath + "]");
-        } else {
-          LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " 
+
-              "to final output[" + finalSubPath + "]");
-        }
-      }
-    }
-  }
-
-  /**
-   * Removes the path of the parent.
-   * @param parentPath
-   * @param childPath
-   * @return
-   */
-  private String extractSubPath(Path parentPath, Path childPath) {
-    String parentPathStr = parentPath.toUri().getPath();
-    String childPathStr = childPath.toUri().getPath();
-
-    if (parentPathStr.length() > childPathStr.length()) {
-      return null;
-    }
-
-    int index = childPathStr.indexOf(parentPathStr);
-    if (index != 0) {
-      return null;
-    }
-
-    return childPathStr.substring(parentPathStr.length() + 1);
-  }
-
-  /**
-   * Attach the sequence number to a path.
-   *
-   * @param path Path
-   * @param seq sequence number
-   * @param nf Number format
-   * @return New path attached with sequence number
-   * @throws java.io.IOException
-   */
-  private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) 
throws IOException {
-    String[] tokens = path.getName().split("-");
-    if (tokens.length != 4) {
-      throw new IOException("Wrong result file name:" + path);
-    }
-    return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + 
nf.format(seq);
-  }
-
-  /**
-   * Make sure all files are moved.
-   * @param fs FileSystem
-   * @param stagingPath The stagind directory
-   * @return
-   * @throws java.io.IOException
-   */
-  private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws 
IOException {
-    FileStatus[] files = fs.listStatus(stagingPath);
-    if (files != null && files.length != 0) {
-      for (FileStatus eachFile: files) {
-        if (eachFile.isFile()) {
-          LOG.error("There are some unmoved files in staging dir:" + 
eachFile.getPath());
-          return false;
-        } else {
-          if (verifyAllFileMoved(fs, eachFile.getPath())) {
-            fs.delete(eachFile.getPath(), false);
-          } else {
-            return false;
-          }
-        }
-      }
-    }
-
-    return true;
-  }
-
-  /**
-   * This method sets a rename map which includes renamed staging directory to 
final output directory recursively.
-   * If there exists some data files, this delete it for duplicate data.
-   *
-   *
-   * @param fs
-   * @param stagingPath
-   * @param outputPath
-   * @param stagingParentPathString
-   * @throws java.io.IOException
-   */
-  private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path 
outputPath,
-                                         String stagingParentPathString,
-                                         Map<Path, Path> renameDirs, Path 
oldTableDir) throws IOException {
-    FileStatus[] files = fs.listStatus(stagingPath);
-
-    for(FileStatus eachFile : files) {
-      if (eachFile.isDirectory()) {
-        Path oldPath = eachFile.getPath();
-
-        // Make recover directory.
-        String recoverPathString = 
oldPath.toString().replaceAll(stagingParentPathString,
-            oldTableDir.toString());
-        Path recoveryPath = new Path(recoverPathString);
-        if (!fs.exists(recoveryPath)) {
-          fs.mkdirs(recoveryPath);
-        }
-
-        visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, 
stagingParentPathString,
-            renameDirs, oldTableDir);
-        // Find last order partition for renaming
-        String newPathString = 
oldPath.toString().replaceAll(stagingParentPathString,
-            outputPath.toString());
-        Path newPath = new Path(newPathString);
-        if (!isLeafDirectory(fs, eachFile.getPath())) {
-          renameDirs.put(eachFile.getPath(), newPath);
-        } else {
-          if (!fs.exists(newPath)) {
-            fs.mkdirs(newPath);
-          }
-        }
-      }
-    }
-  }
-
-  private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException 
{
-    boolean retValue = false;
-
-    FileStatus[] files = fs.listStatus(path);
-    for (FileStatus file : files) {
-      if (fs.isDirectory(file.getPath())) {
-        retValue = true;
-        break;
-      }
-    }
-
-    return retValue;
-  }
-}

Reply via email to