http://git-wip-us.apache.org/repos/asf/tajo/blob/87c957e4/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java
----------------------------------------------------------------------
diff --cc 
tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java
index ee24b27,0000000..442ed5e
mode 100644,000000..100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java
@@@ -1,831 -1,0 +1,832 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.tajo.storage;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Lists;
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.*;
 +import org.apache.hadoop.hdfs.DFSConfigKeys;
 +import org.apache.hadoop.hdfs.DistributedFileSystem;
 +import org.apache.tajo.*;
 +import org.apache.tajo.catalog.*;
 +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 +import org.apache.tajo.catalog.statistics.TableStats;
 +import org.apache.tajo.conf.TajoConf;
 +import org.apache.tajo.plan.logical.LogicalNode;
 +import org.apache.tajo.plan.logical.ScanNode;
 +import org.apache.tajo.storage.fragment.FileFragment;
 +import org.apache.tajo.storage.fragment.Fragment;
 +import org.apache.tajo.util.*;
 +
 +import java.io.IOException;
 +import java.text.NumberFormat;
 +import java.util.*;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +public class FileStorageManager extends StorageManager {
 +  private final Log LOG = LogFactory.getLog(FileStorageManager.class);
 +
 +  static final String OUTPUT_FILE_PREFIX="part-";
 +  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
 +      new ThreadLocal<NumberFormat>() {
 +        @Override
 +        public NumberFormat initialValue() {
 +          NumberFormat fmt = NumberFormat.getInstance();
 +          fmt.setGroupingUsed(false);
 +          fmt.setMinimumIntegerDigits(2);
 +          return fmt;
 +        }
 +      };
 +  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK =
 +      new ThreadLocal<NumberFormat>() {
 +        @Override
 +        public NumberFormat initialValue() {
 +          NumberFormat fmt = NumberFormat.getInstance();
 +          fmt.setGroupingUsed(false);
 +          fmt.setMinimumIntegerDigits(6);
 +          return fmt;
 +        }
 +      };
 +
 +  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ =
 +      new ThreadLocal<NumberFormat>() {
 +        @Override
 +        public NumberFormat initialValue() {
 +          NumberFormat fmt = NumberFormat.getInstance();
 +          fmt.setGroupingUsed(false);
 +          fmt.setMinimumIntegerDigits(3);
 +          return fmt;
 +        }
 +      };
 +
 +  protected FileSystem fs;
 +  protected Path tableBaseDir;
 +  protected boolean blocksMetadataEnabled;
 +  private static final HdfsVolumeId zeroVolumeId = new 
HdfsVolumeId(Bytes.toBytes(0));
 +
 +  public FileStorageManager(StoreType storeType) {
 +    super(storeType);
 +  }
 +
 +  @Override
 +  protected void storageInit() throws IOException {
 +    this.tableBaseDir = TajoConf.getWarehouseDir(conf);
 +    this.fs = tableBaseDir.getFileSystem(conf);
 +    this.blocksMetadataEnabled = 
conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
 +        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
 +    if (!this.blocksMetadataEnabled)
 +      LOG.warn("does not support block metadata. 
('dfs.datanode.hdfs-blocks-metadata.enabled')");
 +  }
 +
 +  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
 +      throws IOException {
 +    FileSystem fs = path.getFileSystem(conf);
 +    FileStatus status = fs.getFileStatus(path);
 +    return getFileScanner(meta, schema, path, status);
 +  }
 +
 +  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, 
FileStatus status)
 +      throws IOException {
 +    Fragment fragment = new FileFragment(path.getName(), path, 0, 
status.getLen());
 +    return getScanner(meta, schema, fragment);
 +  }
 +
 +  public FileSystem getFileSystem() {
 +    return this.fs;
 +  }
 +
 +  public Path getWarehouseDir() {
 +    return this.tableBaseDir;
 +  }
 +
 +  public void delete(Path tablePath) throws IOException {
 +    FileSystem fs = tablePath.getFileSystem(conf);
 +    fs.delete(tablePath, true);
 +  }
 +
 +  public boolean exists(Path path) throws IOException {
 +    FileSystem fileSystem = path.getFileSystem(conf);
 +    return fileSystem.exists(path);
 +  }
 +
 +  /**
 +   * This method deletes only data contained in the given path.
 +   *
 +   * @param path The path in which data are deleted.
 +   * @throws IOException
 +   */
 +  public void deleteData(Path path) throws IOException {
 +    FileSystem fileSystem = path.getFileSystem(conf);
 +    FileStatus[] fileLists = fileSystem.listStatus(path);
 +    for (FileStatus status : fileLists) {
 +      fileSystem.delete(status.getPath(), true);
 +    }
 +  }
 +
 +  public Path getTablePath(String tableName) {
 +    return new Path(tableBaseDir, tableName);
 +  }
 +
 +  @VisibleForTesting
 +  public Appender getAppender(TableMeta meta, Schema schema, Path filePath)
 +      throws IOException {
 +    return getAppender(null, null, meta, schema, filePath);
 +  }
 +
 +  public FileFragment[] split(String tableName) throws IOException {
 +    Path tablePath = new Path(tableBaseDir, tableName);
 +    return split(tableName, tablePath, fs.getDefaultBlockSize());
 +  }
 +
 +  public FileFragment[] split(String tableName, long fragmentSize) throws 
IOException {
 +    Path tablePath = new Path(tableBaseDir, tableName);
 +    return split(tableName, tablePath, fragmentSize);
 +  }
 +
 +  public FileFragment[] splitBroadcastTable(Path tablePath) throws 
IOException {
 +    FileSystem fs = tablePath.getFileSystem(conf);
 +    List<FileFragment> listTablets = new ArrayList<FileFragment>();
 +    FileFragment tablet;
 +
 +    FileStatus[] fileLists = fs.listStatus(tablePath);
 +    for (FileStatus file : fileLists) {
 +      tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, 
file.getLen());
 +      listTablets.add(tablet);
 +    }
 +
 +    FileFragment[] tablets = new FileFragment[listTablets.size()];
 +    listTablets.toArray(tablets);
 +
 +    return tablets;
 +  }
 +
 +  public FileFragment[] split(Path tablePath) throws IOException {
 +    FileSystem fs = tablePath.getFileSystem(conf);
 +    return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
 +  }
 +
 +  public FileFragment[] split(String tableName, Path tablePath) throws 
IOException {
 +    return split(tableName, tablePath, fs.getDefaultBlockSize());
 +  }
 +
 +  private FileFragment[] split(String tableName, Path tablePath, long size)
 +      throws IOException {
 +    FileSystem fs = tablePath.getFileSystem(conf);
 +
 +    long defaultBlockSize = size;
 +    List<FileFragment> listTablets = new ArrayList<FileFragment>();
 +    FileFragment tablet;
 +
 +    FileStatus[] fileLists = fs.listStatus(tablePath);
 +    for (FileStatus file : fileLists) {
 +      long remainFileSize = file.getLen();
 +      long start = 0;
 +      if (remainFileSize > defaultBlockSize) {
 +        while (remainFileSize > defaultBlockSize) {
 +          tablet = new FileFragment(tableName, file.getPath(), start, 
defaultBlockSize);
 +          listTablets.add(tablet);
 +          start += defaultBlockSize;
 +          remainFileSize -= defaultBlockSize;
 +        }
 +        listTablets.add(new FileFragment(tableName, file.getPath(), start, 
remainFileSize));
 +      } else {
 +        listTablets.add(new FileFragment(tableName, file.getPath(), 0, 
remainFileSize));
 +      }
 +    }
 +
 +    FileFragment[] tablets = new FileFragment[listTablets.size()];
 +    listTablets.toArray(tablets);
 +
 +    return tablets;
 +  }
 +
 +  public static FileFragment[] splitNG(Configuration conf, String tableName, 
TableMeta meta,
 +                                       Path tablePath, long size)
 +      throws IOException {
 +    FileSystem fs = tablePath.getFileSystem(conf);
 +
 +    long defaultBlockSize = size;
 +    List<FileFragment> listTablets = new ArrayList<FileFragment>();
 +    FileFragment tablet;
 +
 +    FileStatus[] fileLists = fs.listStatus(tablePath);
 +    for (FileStatus file : fileLists) {
 +      long remainFileSize = file.getLen();
 +      long start = 0;
 +      if (remainFileSize > defaultBlockSize) {
 +        while (remainFileSize > defaultBlockSize) {
 +          tablet = new FileFragment(tableName, file.getPath(), start, 
defaultBlockSize);
 +          listTablets.add(tablet);
 +          start += defaultBlockSize;
 +          remainFileSize -= defaultBlockSize;
 +        }
 +        listTablets.add(new FileFragment(tableName, file.getPath(), start, 
remainFileSize));
 +      } else {
 +        listTablets.add(new FileFragment(tableName, file.getPath(), 0, 
remainFileSize));
 +      }
 +    }
 +
 +    FileFragment[] tablets = new FileFragment[listTablets.size()];
 +    listTablets.toArray(tablets);
 +
 +    return tablets;
 +  }
 +
 +  public long calculateSize(Path tablePath) throws IOException {
 +    FileSystem fs = tablePath.getFileSystem(conf);
 +    long totalSize = 0;
 +
 +    if (fs.exists(tablePath)) {
 +      totalSize = fs.getContentSummary(tablePath).getLength();
 +    }
 +
 +    return totalSize;
 +  }
 +
 +  
/////////////////////////////////////////////////////////////////////////////
 +  // FileInputFormat Area
 +  
/////////////////////////////////////////////////////////////////////////////
 +
 +  public static final PathFilter hiddenFileFilter = new PathFilter() {
 +    public boolean accept(Path p) {
 +      String name = p.getName();
 +      return !name.startsWith("_") && !name.startsWith(".");
 +    }
 +  };
 +
 +  public Path getAppenderFilePath(QueryUnitAttemptId taskAttemptId, Path 
workDir) {
 +    if (taskAttemptId == null) {
 +      // For testcase
 +      return workDir;
 +    }
 +    // The final result of a task will be written in a file named 
part-ss-nnnnnnn,
 +    // where ss is the subquery id associated with this task, and nnnnnn is 
the task id.
 +    Path outFilePath = StorageUtil.concatPath(workDir, 
TajoConstants.RESULT_DIR_NAME,
 +        OUTPUT_FILE_PREFIX +
 +            
OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskAttemptId.getQueryUnitId().getExecutionBlockId().getId())
 + "-" +
 +            
OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getQueryUnitId().getId()) + 
"-" +
 +            OUTPUT_FILE_FORMAT_SEQ.get().format(0));
 +    LOG.info("Output File Path: " + outFilePath);
 +
 +    return outFilePath;
 +  }
 +
 +  /**
 +   * Proxy PathFilter that accepts a path only if all filters given in the
 +   * constructor do. Used by the listPaths() to apply the built-in
 +   * hiddenFileFilter together with a user provided one (if any).
 +   */
 +  private static class MultiPathFilter implements PathFilter {
 +    private List<PathFilter> filters;
 +
 +    public MultiPathFilter(List<PathFilter> filters) {
 +      this.filters = filters;
 +    }
 +
 +    public boolean accept(Path path) {
 +      for (PathFilter filter : filters) {
 +        if (!filter.accept(path)) {
 +          return false;
 +        }
 +      }
 +      return true;
 +    }
 +  }
 +
 +  /**
 +   * List input directories.
 +   * Subclasses may override to, e.g., select only files matching a regular
 +   * expression.
 +   *
 +   * @return array of FileStatus objects
 +   * @throws IOException if zero items.
 +   */
 +  protected List<FileStatus> listStatus(Path... dirs) throws IOException {
 +    List<FileStatus> result = new ArrayList<FileStatus>();
 +    if (dirs.length == 0) {
 +      throw new IOException("No input paths specified in job");
 +    }
 +
 +    List<IOException> errors = new ArrayList<IOException>();
 +
 +    // creates a MultiPathFilter with the hiddenFileFilter and the
 +    // user provided one (if any).
 +    List<PathFilter> filters = new ArrayList<PathFilter>();
 +    filters.add(hiddenFileFilter);
 +
 +    PathFilter inputFilter = new MultiPathFilter(filters);
 +
 +    for (int i = 0; i < dirs.length; ++i) {
 +      Path p = dirs[i];
 +
 +      FileSystem fs = p.getFileSystem(conf);
 +      FileStatus[] matches = fs.globStatus(p, inputFilter);
 +      if (matches == null) {
 +        errors.add(new IOException("Input path does not exist: " + p));
 +      } else if (matches.length == 0) {
 +        errors.add(new IOException("Input Pattern " + p + " matches 0 
files"));
 +      } else {
 +        for (FileStatus globStat : matches) {
 +          if (globStat.isDirectory()) {
 +            for (FileStatus stat : fs.listStatus(globStat.getPath(),
 +                inputFilter)) {
 +              result.add(stat);
 +            }
 +          } else {
 +            result.add(globStat);
 +          }
 +        }
 +      }
 +    }
 +
 +    if (!errors.isEmpty()) {
 +      throw new InvalidInputException(errors);
 +    }
 +    LOG.info("Total input paths to process : " + result.size());
 +    return result;
 +  }
 +
 +  /**
 +   * Is the given filename splitable? Usually, true, but if the file is
 +   * stream compressed, it will not be.
 +   * <p/>
 +   * <code>FileInputFormat</code> implementations can override this and return
 +   * <code>false</code> to ensure that individual input files are never 
split-up
 +   * so that Mappers process entire files.
 +   *
 +   *
 +   * @param path the file name to check
 +   * @param status get the file length
 +   * @return is this file isSplittable?
 +   */
 +  protected boolean isSplittable(TableMeta meta, Schema schema, Path path, 
FileStatus status) throws IOException {
 +    Scanner scanner = getFileScanner(meta, schema, path, status);
 +    boolean split = scanner.isSplittable();
 +    scanner.close();
 +    return split;
 +  }
 +
 +  private static final double SPLIT_SLOP = 1.1;   // 10% slop
 +
 +  protected int getBlockIndex(BlockLocation[] blkLocations,
 +                              long offset) {
 +    for (int i = 0; i < blkLocations.length; i++) {
 +      // is the offset inside this block?
 +      if ((blkLocations[i].getOffset() <= offset) &&
 +          (offset < blkLocations[i].getOffset() + 
blkLocations[i].getLength())) {
 +        return i;
 +      }
 +    }
 +    BlockLocation last = blkLocations[blkLocations.length - 1];
 +    long fileLength = last.getOffset() + last.getLength() - 1;
 +    throw new IllegalArgumentException("Offset " + offset +
 +        " is outside of file (0.." +
 +        fileLength + ")");
 +  }
 +
 +  /**
 +   * A factory that makes the split for this class. It can be overridden
 +   * by sub-classes to make sub-types
 +   */
 +  protected FileFragment makeSplit(String fragmentId, Path file, long start, 
long length) {
 +    return new FileFragment(fragmentId, file, start, length);
 +  }
 +
 +  protected FileFragment makeSplit(String fragmentId, Path file, long start, 
long length,
 +                                   String[] hosts) {
 +    return new FileFragment(fragmentId, file, start, length, hosts);
 +  }
 +
 +  protected FileFragment makeSplit(String fragmentId, Path file, 
BlockLocation blockLocation)
 +      throws IOException {
 +    return new FileFragment(fragmentId, file, blockLocation);
 +  }
 +
 +  // for Non Splittable. eg, compressed gzip TextFile
 +  protected FileFragment makeNonSplit(String fragmentId, Path file, long 
start, long length,
 +                                      BlockLocation[] blkLocations) throws 
IOException {
 +
 +    Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
 +    for (BlockLocation blockLocation : blkLocations) {
 +      for (String host : blockLocation.getHosts()) {
 +        if (hostsBlockMap.containsKey(host)) {
 +          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
 +        } else {
 +          hostsBlockMap.put(host, 1);
 +        }
 +      }
 +    }
 +
 +    List<Map.Entry<String, Integer>> entries = new 
ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
 +    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
 +
 +      @Override
 +      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, 
Integer> v2) {
 +        return v1.getValue().compareTo(v2.getValue());
 +      }
 +    });
 +
 +    String[] hosts = new String[blkLocations[0].getHosts().length];
 +
 +    for (int i = 0; i < hosts.length; i++) {
 +      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - 
i);
 +      hosts[i] = entry.getKey();
 +    }
 +    return new FileFragment(fragmentId, file, start, length, hosts);
 +  }
 +
 +  /**
 +   * Get the minimum split size
 +   *
 +   * @return the minimum number of bytes that can be in a split
 +   */
 +  public long getMinSplitSize() {
 +    return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE);
 +  }
 +
 +  /**
 +   * Get Disk Ids by Volume Bytes
 +   */
 +  private int[] getDiskIds(VolumeId[] volumeIds) {
 +    int[] diskIds = new int[volumeIds.length];
 +    for (int i = 0; i < volumeIds.length; i++) {
 +      int diskId = -1;
 +      if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
 +        diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode();
 +      }
 +      diskIds[i] = diskId;
 +    }
 +    return diskIds;
 +  }
 +
 +  /**
 +   * Generate the map of host and make them into Volume Ids.
 +   *
 +   */
 +  private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
 +    Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
 +    for (FileFragment frag : frags) {
 +      String[] hosts = frag.getHosts();
 +      int[] diskIds = frag.getDiskIds();
 +      for (int i = 0; i < hosts.length; i++) {
 +        Set<Integer> volumeList = volumeMap.get(hosts[i]);
 +        if (volumeList == null) {
 +          volumeList = new HashSet<Integer>();
 +          volumeMap.put(hosts[i], volumeList);
 +        }
 +
 +        if (diskIds.length > 0 && diskIds[i] > -1) {
 +          volumeList.add(diskIds[i]);
 +        }
 +      }
 +    }
 +
 +    return volumeMap;
 +  }
 +  /**
 +   * Generate the list of files and make them into FileSplits.
 +   *
 +   * @throws IOException
 +   */
 +  public List<Fragment> getSplits(String tableName, TableMeta meta, Schema 
schema, Path... inputs)
 +      throws IOException {
 +    // generate splits'
 +
 +    List<Fragment> splits = Lists.newArrayList();
 +    List<Fragment> volumeSplits = Lists.newArrayList();
 +    List<BlockLocation> blockLocations = Lists.newArrayList();
 +
 +    for (Path p : inputs) {
 +      FileSystem fs = p.getFileSystem(conf);
 +      ArrayList<FileStatus> files = Lists.newArrayList();
 +      if (fs.isFile(p)) {
 +        files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
 +      } else {
 +        files.addAll(listStatus(p));
 +      }
 +
 +      int previousSplitSize = splits.size();
 +      for (FileStatus file : files) {
 +        Path path = file.getPath();
 +        long length = file.getLen();
 +        if (length > 0) {
 +          // Get locations of blocks of file
 +          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, 
length);
 +          boolean splittable = isSplittable(meta, schema, path, file);
 +          if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
 +
 +            if (splittable) {
 +              for (BlockLocation blockLocation : blkLocations) {
 +                volumeSplits.add(makeSplit(tableName, path, blockLocation));
 +              }
 +              blockLocations.addAll(Arrays.asList(blkLocations));
 +
 +            } else { // Non splittable
 +              long blockSize = blkLocations[0].getLength();
 +              if (blockSize >= length) {
 +                blockLocations.addAll(Arrays.asList(blkLocations));
 +                for (BlockLocation blockLocation : blkLocations) {
 +                  volumeSplits.add(makeSplit(tableName, path, blockLocation));
 +                }
 +              } else {
 +                splits.add(makeNonSplit(tableName, path, 0, length, 
blkLocations));
 +              }
 +            }
 +
 +          } else {
 +            if (splittable) {
 +
 +              long minSize = Math.max(getMinSplitSize(), 1);
 +
 +              long blockSize = file.getBlockSize(); // s3n rest api contained 
block size but blockLocations is one
 +              long splitSize = Math.max(minSize, blockSize);
 +              long bytesRemaining = length;
 +
 +              // for s3
 +              while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
 +                int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
 +                splits.add(makeSplit(tableName, path, length - 
bytesRemaining, splitSize,
 +                    blkLocations[blkIndex].getHosts()));
 +                bytesRemaining -= splitSize;
 +              }
 +              if (bytesRemaining > 0) {
 +                int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
 +                splits.add(makeSplit(tableName, path, length - 
bytesRemaining, bytesRemaining,
 +                    blkLocations[blkIndex].getHosts()));
 +              }
 +            } else { // Non splittable
 +              splits.add(makeNonSplit(tableName, path, 0, length, 
blkLocations));
 +            }
 +          }
 +        } else {
 +          //for zero length files
 +          splits.add(makeSplit(tableName, path, 0, length));
 +        }
 +      }
 +      if(LOG.isDebugEnabled()){
 +        LOG.debug("# of splits per partition: " + (splits.size() - 
previousSplitSize));
 +      }
 +    }
 +
 +    // Combine original fileFragments with new VolumeId information
 +    setVolumeMeta(volumeSplits, blockLocations);
 +    splits.addAll(volumeSplits);
 +    LOG.info("Total # of splits: " + splits.size());
 +    return splits;
 +  }
 +
 +  private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> 
blockLocations)
 +      throws IOException {
 +
 +    int locationSize = blockLocations.size();
 +    int splitSize = splits.size();
 +    if (locationSize == 0 || splitSize == 0) return;
 +
 +    if (locationSize != splitSize) {
 +      // splits and locations don't match up
 +      LOG.warn("Number of block locations not equal to number of splits: "
 +          + "#locations=" + locationSize
 +          + " #splits=" + splitSize);
 +      return;
 +    }
 +
 +    DistributedFileSystem fs = 
(DistributedFileSystem)DistributedFileSystem.get(conf);
 +    int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, 
DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
 +    int blockLocationIdx = 0;
 +
 +    Iterator<Fragment> iter = splits.iterator();
 +    while (locationSize > blockLocationIdx) {
 +
 +      int subSize = Math.min(locationSize - blockLocationIdx, lsLimit);
 +      List<BlockLocation> locations = 
blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize);
 +      //BlockStorageLocation containing additional volume location 
information for each replica of each block.
 +      BlockStorageLocation[] blockStorageLocations = 
fs.getFileBlockStorageLocations(locations);
 +
 +      for (BlockStorageLocation blockStorageLocation : blockStorageLocations) 
{
 +        
((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds()));
 +        blockLocationIdx++;
 +      }
 +    }
 +    LOG.info("# of splits with volumeId " + splitSize);
 +  }
 +
 +  private static class InvalidInputException extends IOException {
 +    List<IOException> errors;
 +    public InvalidInputException(List<IOException> errors) {
 +      this.errors = errors;
 +    }
 +
 +    @Override
 +    public String getMessage(){
 +      StringBuffer sb = new StringBuffer();
 +      int messageLimit = Math.min(errors.size(), 10);
 +      for (int i = 0; i < messageLimit ; i ++) {
 +        sb.append(errors.get(i).getMessage()).append("\n");
 +      }
 +
 +      if(messageLimit < errors.size())
 +        sb.append("skipped .....").append("\n");
 +
 +      return sb.toString();
 +    }
 +  }
 +
 +  @Override
 +  public List<Fragment> getSplits(String tableName, TableDesc table, ScanNode 
scanNode) throws IOException {
-     return getSplits(tableName, table.getMeta(), table.getSchema(), 
table.getPath());
++    return getSplits(tableName, table.getMeta(), table.getSchema(), new 
Path(table.getPath()));
 +  }
 +
 +  @Override
 +  public void createTable(TableDesc tableDesc, boolean ifNotExists) throws 
IOException {
 +    if (!tableDesc.isExternal()) {
 +      String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName());
 +      String databaseName = splitted[0];
 +      String simpleTableName = splitted[1];
 +
 +      // create a table directory (i.e., 
${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
 +      Path tablePath = StorageUtil.concatPath(tableBaseDir, databaseName, 
simpleTableName);
-       tableDesc.setPath(tablePath);
++      tableDesc.setPath(tablePath.toUri());
 +    } else {
 +      Preconditions.checkState(tableDesc.getPath() != null, "ERROR: LOCATION 
must be given.");
 +    }
 +
-     Path path = tableDesc.getPath();
++    Path path = new Path(tableDesc.getPath());
 +
 +    FileSystem fs = path.getFileSystem(conf);
 +    TableStats stats = new TableStats();
 +    if (tableDesc.isExternal()) {
 +      if (!fs.exists(path)) {
 +        LOG.error(path.toUri() + " does not exist");
 +        throw new IOException("ERROR: " + path.toUri() + " does not exist");
 +      }
 +    } else {
 +      fs.mkdirs(path);
 +    }
 +
 +    long totalSize = 0;
 +
 +    try {
 +      totalSize = calculateSize(path);
 +    } catch (IOException e) {
 +      LOG.warn("Cannot calculate the size of the relation", e);
 +    }
 +
 +    stats.setNumBytes(totalSize);
 +
 +    if (tableDesc.isExternal()) { // if it is an external table, there is no 
way to know the exact row number without processing.
 +      stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
 +    }
 +
 +    tableDesc.setStats(stats);
 +  }
 +
 +  @Override
 +  public void purgeTable(TableDesc tableDesc) throws IOException {
 +    try {
-       Path path = tableDesc.getPath();
++      Path path = new Path(tableDesc.getPath());
 +      FileSystem fs = path.getFileSystem(conf);
 +      LOG.info("Delete table data dir: " + path);
 +      fs.delete(path, true);
 +    } catch (IOException e) {
 +      throw new InternalError(e.getMessage());
 +    }
 +  }
 +
 +  @Override
 +  public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int 
currentPage, int numFragments) throws IOException {
 +    // Listing table data file which is not empty.
 +    // If the table is a partitioned table, return file list which has same 
partition key.
-     FileSystem fs = tableDesc.getPath().getFileSystem(conf);
++    Path tablePath = new Path(tableDesc.getPath());
++    FileSystem fs = tablePath.getFileSystem(conf);
 +
 +    List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>();
-     if (fs.exists(tableDesc.getPath())) {
-       getNonZeroLengthDataFiles(fs, tableDesc.getPath(), nonZeroLengthFiles, 
currentPage, numFragments,
++    if (fs.exists(tablePath)) {
++      getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, 
currentPage, numFragments,
 +          new AtomicInteger(0));
 +    }
 +
 +    List<Fragment> fragments = new ArrayList<Fragment>();
 +
 +    //In the case of partitioned table, return same partition key data files.
 +    int numPartitionColumns = 0;
 +    if (tableDesc.hasPartition()) {
 +      numPartitionColumns = 
tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size();
 +    }
 +    String[] previousPartitionPathNames = null;
 +    for (FileStatus eachFile: nonZeroLengthFiles) {
 +      FileFragment fileFragment = new FileFragment(tableDesc.getName(), 
eachFile.getPath(), 0, eachFile.getLen(), null);
 +
 +      if (numPartitionColumns > 0) {
 +        // finding partition key;
 +        Path filePath = fileFragment.getPath();
 +        Path parentPath = filePath;
 +        String[] parentPathNames = new String[numPartitionColumns];
 +        for (int i = 0; i < numPartitionColumns; i++) {
 +          parentPath = parentPath.getParent();
 +          parentPathNames[numPartitionColumns - i - 1] = parentPath.getName();
 +        }
 +
 +        // If current partitionKey == previousPartitionKey, add to result.
 +        if (previousPartitionPathNames == null) {
 +          fragments.add(fileFragment);
 +        } else if (previousPartitionPathNames != null && 
Arrays.equals(previousPartitionPathNames, parentPathNames)) {
 +          fragments.add(fileFragment);
 +        } else {
 +          break;
 +        }
 +        previousPartitionPathNames = parentPathNames;
 +      } else {
 +        fragments.add(fileFragment);
 +      }
 +    }
 +
 +    return fragments;
 +  }
 +
 +  private void getNonZeroLengthDataFiles(FileSystem fs, Path path, 
List<FileStatus> result,
 +                                                int startFileIndex, int 
numResultFiles,
 +                                                AtomicInteger 
currentFileIndex) throws IOException {
 +    if (fs.isDirectory(path)) {
 +      FileStatus[] files = fs.listStatus(path, 
FileStorageManager.hiddenFileFilter);
 +      if (files != null && files.length > 0) {
 +        for (FileStatus eachFile : files) {
 +          if (result.size() >= numResultFiles) {
 +            return;
 +          }
 +          if (eachFile.isDirectory()) {
 +            getNonZeroLengthDataFiles(fs, eachFile.getPath(), result, 
startFileIndex, numResultFiles,
 +                currentFileIndex);
 +          } else if (eachFile.isFile() && eachFile.getLen() > 0) {
 +            if (currentFileIndex.get() >= startFileIndex) {
 +              result.add(eachFile);
 +            }
 +            currentFileIndex.incrementAndGet();
 +          }
 +        }
 +      }
 +    } else {
 +      FileStatus fileStatus = fs.getFileStatus(path);
 +      if (fileStatus != null && fileStatus.getLen() > 0) {
 +        if (currentFileIndex.get() >= startFileIndex) {
 +          result.add(fileStatus);
 +        }
 +        currentFileIndex.incrementAndGet();
 +        if (result.size() >= numResultFiles) {
 +          return;
 +        }
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public StorageProperty getStorageProperty() {
 +    StorageProperty storageProperty = new StorageProperty();
 +    storageProperty.setSortedInsert(false);
 +    if (storeType == StoreType.RAW) {
 +      storageProperty.setSupportsInsertInto(false);
 +    } else {
 +      storageProperty.setSupportsInsertInto(true);
 +    }
 +
 +    return storageProperty;
 +  }
 +
 +  @Override
 +  public void closeStorageManager() {
 +  }
 +
 +  @Override
 +  public void beforeInsertOrCATS(LogicalNode node) throws IOException {
 +  }
 +
 +  @Override
 +  public void rollbackOutputCommit(LogicalNode node) throws IOException {
 +  }
 +
 +  @Override
 +  public TupleRange[] getInsertSortRanges(OverridableConf queryContext, 
TableDesc tableDesc,
 +                                          Schema inputSchema, SortSpec[] 
sortSpecs, TupleRange dataRange)
 +      throws IOException {
 +    return null;
 +  }
 +}

Reply via email to